KAFKA-18274 Failed to restart controller in testing due to closed socket channel [1/2] (#18310)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Peter Lee 2024-12-25 20:52:06 +08:00 committed by GitHub
parent 88adb94c81
commit c7c1364b0f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 6 additions and 3 deletions

View File

@ -26,6 +26,7 @@ import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, Delegati
import kafka.utils.{CoreUtils, Logging} import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
import org.apache.kafka.common.utils.{LogContext, Utils} import org.apache.kafka.common.utils.{LogContext, Utils}
@ -155,6 +156,11 @@ class ControllerServer(
() => featuresPublisher.features() () => featuresPublisher.features()
) )
// metrics will be set to null when closing a controller, so we should recreate it for testing
if (sharedServer.metrics == null){
sharedServer.metrics = new Metrics()
}
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
socketServer = new SocketServer(config, socketServer = new SocketServer(config,

View File

@ -31,7 +31,6 @@ import org.apache.kafka.common.config.ConfigResource.Type
import org.apache.kafka.common.errors.{InvalidPartitionsException, PolicyViolationException, UnsupportedVersionException} import org.apache.kafka.common.errors.{InvalidPartitionsException, PolicyViolationException, UnsupportedVersionException}
import org.apache.kafka.common.message.DescribeClusterRequestData import org.apache.kafka.common.message.DescribeClusterRequestData
import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord} import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.quota.ClientQuotaAlteration.Op import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
@ -122,8 +121,6 @@ class KRaftClusterTest {
val config = controller.sharedServer.controllerConfig.props val config = controller.sharedServer.controllerConfig.props
config.asInstanceOf[util.HashMap[String,String]].put(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:$port") config.asInstanceOf[util.HashMap[String,String]].put(SocketServerConfigs.LISTENERS_CONFIG, s"CONTROLLER://localhost:$port")
controller.sharedServer.controllerConfig.updateCurrentConfig(new KafkaConfig(config)) controller.sharedServer.controllerConfig.updateCurrentConfig(new KafkaConfig(config))
// metrics will be set to null when closing a controller, so we should recreate it for testing
controller.sharedServer.metrics = new Metrics()
// restart controller // restart controller
controller.startup() controller.startup()