KAFKA-15421: fix network thread leak in testThreadPoolResize (#14320)

In SocketServerTest, we create SocketServer and enableRequestProcessing on each test class initialization. That's fine since we shutdown it in @AfterEach. The issue we have is we disabled 2 tests in this test suite. And when running these disabled tests, we will go through class initialization, but without @AfterEach. That causes 2 network thread leaked.

Compared the error message in DynamicBrokerReconfigurationTest#testThreadPoolResize test here:

org.opentest4j.AssertionFailedError: Invalid threads: expected 6, got 8: List(data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, data-plane-kafka-socket-acceptor-ListenerName(INTERNAL)-SSL-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0, data-plane-kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-0, data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-0) ==> expected: <true> but was: <false>

The 2 unexpected network threads are leaked from SocketServerTest.

Reviewers: Satish Duggana <satishd@apache.org>, Christo Lolov <lolovc@amazon.com>, Divij Vaidya <diviv@amazon.com>, Kamal Chandraprakash <kchandraprakash@uber.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Luke Chen 2023-09-03 16:16:54 +08:00 committed by GitHub
parent cc53889aaa
commit da99879df7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 1 additions and 4 deletions

View File

@ -798,7 +798,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
consumer.commitSync()
}
@Disabled
@Test
def testThreadPoolResize(): Unit = {
val requestHandlerPrefix = "data-plane-kafka-request-handler-"

View File

@ -81,7 +81,6 @@ class SocketServerTest {
private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true, false,
() => new Features(MetadataVersion.latest(), Collections.emptyMap[String, java.lang.Short], 0, true))
val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
val sockets = new ArrayBuffer[Socket]
private val kafkaLogger = org.apache.log4j.LogManager.getLogger("kafka")
@ -94,6 +93,7 @@ class SocketServerTest {
@BeforeEach
def setUp(): Unit = {
server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
// Run the tests with TRACE logging to exercise request logging path
logLevelToRestore = kafkaLogger.getLevel
kafkaLogger.setLevel(Level.TRACE)

View File

@ -32,7 +32,6 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@ -155,7 +154,6 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
context = new TieredStorageTestContext(this);
}
@Disabled
@Test
public void executeTieredStorageTest() {
TieredStorageTestBuilder builder = new TieredStorageTestBuilder();