mirror of https://github.com/apache/kafka.git
MINOR: Reduce number of threads created for integration test brokers (#13655)
The integration tests seem to create an unnecessarily large number of threads. This reduces the number of threads created per integration test harness broker. Reviewers: Luke Chen <showuon@gmail.com>. Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
c08120f83f
commit
d46c3f259c
|
@ -29,7 +29,6 @@ import org.apache.kafka.common.record.UnalignedRecords;
|
||||||
import org.apache.kafka.common.requests.ByteBufferChannel;
|
import org.apache.kafka.common.requests.ByteBufferChannel;
|
||||||
import org.apache.kafka.common.requests.RequestHeader;
|
import org.apache.kafka.common.requests.RequestHeader;
|
||||||
import org.apache.kafka.common.utils.Exit;
|
import org.apache.kafka.common.utils.Exit;
|
||||||
import org.apache.kafka.common.utils.KafkaThread;
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -150,17 +149,6 @@ public class TestUtils {
|
||||||
public static File tempFile(final String prefix, final String suffix) throws IOException {
|
public static File tempFile(final String prefix, final String suffix) throws IOException {
|
||||||
final File file = Files.createTempFile(prefix, suffix).toFile();
|
final File file = Files.createTempFile(prefix, suffix).toFile();
|
||||||
file.deleteOnExit();
|
file.deleteOnExit();
|
||||||
|
|
||||||
// Note that we don't use Exit.addShutdownHook here because it allows for the possibility of accidently
|
|
||||||
// overriding the behaviour of this hook leading to leaked files.
|
|
||||||
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon("delete-temp-file-shutdown-hook", () -> {
|
|
||||||
try {
|
|
||||||
Utils.delete(file);
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("Error deleting {}", file.getAbsolutePath(), e);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
return file;
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ class DynamicNumNetworkThreadsTest extends BaseRequestTest {
|
||||||
properties.put(KafkaConfig.ListenersProp, s"$internal://localhost:0, $external://localhost:0")
|
properties.put(KafkaConfig.ListenersProp, s"$internal://localhost:0, $external://localhost:0")
|
||||||
properties.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$internal:PLAINTEXT, $external:PLAINTEXT")
|
properties.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$internal:PLAINTEXT, $external:PLAINTEXT")
|
||||||
properties.put(s"listener.name.${internal.toLowerCase}.${KafkaConfig.NumNetworkThreadsProp}", "2")
|
properties.put(s"listener.name.${internal.toLowerCase}.${KafkaConfig.NumNetworkThreadsProp}", "2")
|
||||||
|
properties.put(KafkaConfig.NumNetworkThreadsProp, Defaults.NumNetworkThreads.toString)
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
|
|
|
@ -1989,6 +1989,7 @@ class SocketServerTest {
|
||||||
val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
|
val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
|
||||||
trustStoreFile = Some(trustStoreFile))
|
trustStoreFile = Some(trustStoreFile))
|
||||||
sslProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
|
sslProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
|
||||||
|
sslProps.put(KafkaConfig.NumNetworkThreadsProp, "1")
|
||||||
sslProps
|
sslProps
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -609,7 +609,9 @@ class DynamicBrokerConfigTest {
|
||||||
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
|
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
|
||||||
when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new java.util.Properties())
|
when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new java.util.Properties())
|
||||||
|
|
||||||
val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092))
|
val initialProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
|
||||||
|
initialProps.remove(KafkaConfig.BackgroundThreadsProp)
|
||||||
|
val oldConfig = KafkaConfig.fromProps(initialProps)
|
||||||
val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
|
val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
|
||||||
dynamicBrokerConfig.initialize(Some(zkClient))
|
dynamicBrokerConfig.initialize(Some(zkClient))
|
||||||
dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
|
dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
|
||||||
|
|
|
@ -353,6 +353,9 @@ object TestUtils extends Logging {
|
||||||
if (!props.containsKey(KafkaConfig.GroupInitialRebalanceDelayMsProp))
|
if (!props.containsKey(KafkaConfig.GroupInitialRebalanceDelayMsProp))
|
||||||
props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
|
props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
|
||||||
rack.foreach(props.put(KafkaConfig.RackProp, _))
|
rack.foreach(props.put(KafkaConfig.RackProp, _))
|
||||||
|
// Reduce number of threads per broker
|
||||||
|
props.put(KafkaConfig.NumNetworkThreadsProp, "2")
|
||||||
|
props.put(KafkaConfig.BackgroundThreadsProp, "2")
|
||||||
|
|
||||||
if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })
|
if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })
|
||||||
props ++= sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")
|
props ++= sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId")
|
||||||
|
|
Loading…
Reference in New Issue