From d46c3f259cce25c43f20fba3943d5cb34ed909ea Mon Sep 17 00:00:00 2001 From: David Mao <47232755+splett2@users.noreply.github.com> Date: Wed, 3 May 2023 17:09:43 -0700 Subject: [PATCH] MINOR: Reduce number of threads created for integration test brokers (#13655) The integration tests seem to create an unnecessarily large number of threads. This reduces the number of threads created per integration test harness broker. Reviewers: Luke Chen . Justine Olshan --- .../test/java/org/apache/kafka/test/TestUtils.java | 12 ------------ .../kafka/network/DynamicNumNetworkThreadsTest.scala | 1 + .../scala/unit/kafka/network/SocketServerTest.scala | 1 + .../unit/kafka/server/DynamicBrokerConfigTest.scala | 4 +++- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 3 +++ 5 files changed, 8 insertions(+), 13 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index b63a4f20969..be13e8e7a62 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.record.UnalignedRecords; import org.apache.kafka.common.requests.ByteBufferChannel; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.Exit; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -150,17 +149,6 @@ public class TestUtils { public static File tempFile(final String prefix, final String suffix) throws IOException { final File file = Files.createTempFile(prefix, suffix).toFile(); file.deleteOnExit(); - - // Note that we don't use Exit.addShutdownHook here because it allows for the possibility of accidently - // overriding the behaviour of this hook leading to leaked files. - Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon("delete-temp-file-shutdown-hook", () -> { - try { - Utils.delete(file); - } catch (IOException e) { - log.error("Error deleting {}", file.getAbsolutePath(), e); - } - })); - return file; } diff --git a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala index b9f05d5aa87..a76e6cf1ad0 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala @@ -38,6 +38,7 @@ class DynamicNumNetworkThreadsTest extends BaseRequestTest { properties.put(KafkaConfig.ListenersProp, s"$internal://localhost:0, $external://localhost:0") properties.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$internal:PLAINTEXT, $external:PLAINTEXT") properties.put(s"listener.name.${internal.toLowerCase}.${KafkaConfig.NumNetworkThreadsProp}", "2") + properties.put(KafkaConfig.NumNetworkThreadsProp, Defaults.NumNetworkThreads.toString) } @BeforeEach diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5bab866de3e..5e9e84fd465 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -1989,6 +1989,7 @@ class SocketServerTest { val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL), trustStoreFile = Some(trustStoreFile)) sslProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0") + sslProps.put(KafkaConfig.NumNetworkThreadsProp, "1") sslProps } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 76617426f97..77715bc8230 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -609,7 +609,9 @@ class DynamicBrokerConfigTest { val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new java.util.Properties()) - val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)) + val initialProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) + initialProps.remove(KafkaConfig.BackgroundThreadsProp) + val oldConfig = KafkaConfig.fromProps(initialProps) val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig) dynamicBrokerConfig.initialize(Some(zkClient)) dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a499ac38344..fcbd4907216 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -353,6 +353,9 @@ object TestUtils extends Logging { if (!props.containsKey(KafkaConfig.GroupInitialRebalanceDelayMsProp)) props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") rack.foreach(props.put(KafkaConfig.RackProp, _)) + // Reduce number of threads per broker + props.put(KafkaConfig.NumNetworkThreadsProp, "2") + props.put(KafkaConfig.BackgroundThreadsProp, "2") if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) }) props ++= sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")