mirror of https://github.com/apache/kafka.git
KAFKA-18790 Fix testCustomQuotaCallback (#18906)
Frequently updating the trust store can cause unexpected termination of the AsyncConsumer background thread. 1. To resolve this issue, reuse the same AdminClient instead of recreating it. 2. Add error logging when fail to initialize resources for the consumer network thread. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
e6609d3acd
commit
6abb4775b9
|
@ -109,6 +109,8 @@ public class ConsumerNetworkThread extends KafkaThread implements Closeable {
|
|||
log.error("Unexpected error caught in consumer network thread", e);
|
||||
}
|
||||
}
|
||||
} catch (final Throwable e) {
|
||||
log.error("Failed to initialize resources for consumer network thread", e);
|
||||
} finally {
|
||||
cleanup();
|
||||
}
|
||||
|
|
|
@ -40,9 +40,7 @@ import java.util.Properties
|
|||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
|
||||
import java.{lang, util}
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.jdk.CollectionConverters._
|
||||
import scala.util.Using
|
||||
|
||||
class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
||||
|
||||
|
@ -57,8 +55,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
|||
private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
|
||||
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
|
||||
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
|
||||
private val adminClients = new ArrayBuffer[Admin]()
|
||||
private var producerWithoutQuota: KafkaProducer[Array[Byte], Array[Byte]] = _
|
||||
private var admin: Admin = _
|
||||
|
||||
val defaultRequestQuota = 1000
|
||||
val defaultProduceQuota = 2000 * 1000 * 1000
|
||||
|
@ -82,7 +80,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
|||
|
||||
@AfterEach
|
||||
override def tearDown(): Unit = {
|
||||
adminClients.foreach(_.close())
|
||||
if (admin != null) admin.close()
|
||||
GroupedUserQuotaCallback.tearDown()
|
||||
super.tearDown()
|
||||
closeSasl()
|
||||
|
@ -196,13 +194,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
|||
topic: String,
|
||||
listenerName: ListenerName = listenerName
|
||||
): Unit = {
|
||||
Using.resource(createAdminClient()) { admin =>
|
||||
TestUtils.deleteTopicWithAdmin(
|
||||
admin = admin,
|
||||
topic = topic,
|
||||
brokers = aliveBrokers,
|
||||
controllers = controllerServers)
|
||||
}
|
||||
TestUtils.deleteTopicWithAdmin(
|
||||
admin = createAdminClient(),
|
||||
topic = topic,
|
||||
brokers = aliveBrokers,
|
||||
controllers = controllerServers)
|
||||
}
|
||||
|
||||
private def createTopic(topic: String, numPartitions: Int, leader: Int): Unit = {
|
||||
|
@ -218,6 +214,7 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
|||
}
|
||||
|
||||
private def createAdminClient(): Admin = {
|
||||
if (admin != null) return admin
|
||||
val config = new util.HashMap[String, Object]
|
||||
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
|
||||
clientSecurityProps("admin-client").asInstanceOf[util.Map[Object, Object]].forEach { (key, value) =>
|
||||
|
@ -225,9 +222,8 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup {
|
|||
}
|
||||
config.put(SaslConfigs.SASL_JAAS_CONFIG,
|
||||
JaasModule.scramLoginModule(JaasTestUtils.KAFKA_SCRAM_ADMIN, JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD).toString)
|
||||
val adminClient = Admin.create(config)
|
||||
adminClients += adminClient
|
||||
adminClient
|
||||
admin = Admin.create(config)
|
||||
admin
|
||||
}
|
||||
|
||||
private def produceWithoutThrottle(topic: String, numRecords: Int): Unit = {
|
||||
|
|
Loading…
Reference in New Issue