KAFKA-18790 Fix testCustomQuotaCallback (#18906)

Frequently updating the trust store can cause unexpected termination of the AsyncConsumer background thread.

1. To resolve this issue, reuse the same AdminClient instead of recreating it.
2. Add error logging when fail to initialize resources for the consumer network thread.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Ming-Yen Chung 2025-02-15 03:07:59 +08:00 committed by GitHub
parent 79e853d68e
commit e828767062
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 12 additions and 14 deletions

View File

@ -109,6 +109,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
log.error("Unexpected error caught in consumer network thread", e); log.error("Unexpected error caught in consumer network thread", e);
} }
} }
} catch (final Throwable e) {
log.error("Failed to initialize resources for consumer network thread", e);
} finally { } finally {
cleanup(); cleanup();
} }

View File

@ -40,9 +40,7 @@ import java.util.Properties
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.{lang, util} import java.{lang, util}
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.util.Using
class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
@ -57,8 +55,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
private val kafkaClientSaslMechanism = "SCRAM-SHA-256" private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
private val adminClients = new ArrayBuffer[Admin]()
private var producerWithoutQuota: KafkaProducer[Array[Byte], Array[Byte]] = _ private var producerWithoutQuota: KafkaProducer[Array[Byte], Array[Byte]] = _
private var admin: Admin = _
val defaultRequestQuota = 1000 val defaultRequestQuota = 1000
val defaultProduceQuota = 2000 * 1000 * 1000 val defaultProduceQuota = 2000 * 1000 * 1000
@ -82,7 +80,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
@AfterEach @AfterEach
override def tearDown(): Unit = { override def tearDown(): Unit = {
adminClients.foreach(_.close()) if (admin != null) admin.close()
GroupedUserQuotaCallback.tearDown() GroupedUserQuotaCallback.tearDown()
super.tearDown() super.tearDown()
closeSasl() closeSasl()
@ -196,14 +194,12 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
topic: String, topic: String,
listenerName: ListenerName = listenerName listenerName: ListenerName = listenerName
): Unit = { ): Unit = {
Using.resource(createAdminClient()) { admin =>
TestUtils.deleteTopicWithAdmin( TestUtils.deleteTopicWithAdmin(
admin = admin, admin = createAdminClient(),
topic = topic, topic = topic,
brokers = aliveBrokers, brokers = aliveBrokers,
controllers = controllerServers) controllers = controllerServers)
} }
}
private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = { private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = {
val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap val assignment = (0 until numPartitions).map { i => i -> Seq(leader) }.toMap
@ -218,6 +214,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
} }
private def createAdminClient(): Admin = { private def createAdminClient(): Admin = {
if (admin != null) return admin
val config = new util.HashMap[String, Object] val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].forEach { (key, value) => clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].forEach { (key, value) =>
@ -225,9 +222,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
} }
config.put(SaslConfigs.SASL_JAAS_CONFIG, config.put(SaslConfigs.SASL_JAAS_CONFIG,
JaasModule.scramLoginModule(JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD).toString) JaasModule.scramLoginModule(JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD).toString)
val adminClient = Admin.create(config) admin = Admin.create(config)
adminClients += adminClient admin
adminClient
} }
private def produceWithoutThrottle(topic: String, numRecords: Int): Unit = { private def produceWithoutThrottle(topic: String, numRecords: Int): Unit = {