diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 079887c361d..94f0fa6ef4e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -43,7 +43,6 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; -import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -57,8 +56,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileOutputStream; -import java.io.IOException; -import java.net.ServerSocket; import java.nio.file.Path; import java.util.Collection; import java.util.Collections; @@ -247,8 +244,6 @@ public class ConnectWorkerIntegrationTest { ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000)); - useFixedBrokerPort(); - // start the clusters connect = connectBuilder.build(); connect.start(); @@ -813,8 +808,6 @@ public class ConnectWorkerIntegrationTest { workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0"); workerProps.put(METADATA_RECOVERY_STRATEGY_CONFIG, MetadataRecoveryStrategy.NONE.name); - useFixedBrokerPort(); - connect = connectBuilder .numWorkers(1) .build(); @@ -1431,23 +1424,6 @@ public class ConnectWorkerIntegrationTest { return props; } - private void useFixedBrokerPort() throws IOException { - // Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners - // config to get a random free port because in this test we want to stop the Kafka broker and then bring it - // back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka - // and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen - // on a different random free port the second time it is started. Note that we can only use the static port - // because we have a single broker setup in this test. - int listenerPort; - try (ServerSocket s = new ServerSocket(0)) { - listenerPort = s.getLocalPort(); - } - brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", listenerPort)); - connectBuilder - .numBrokers(1) - .brokerProps(brokerProps); - } - public static class EmptyTaskConfigsConnector extends SinkConnector { @Override public String version() { diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index 097f8c3e26d..76fbab01a50 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -206,35 +206,42 @@ public class KafkaClusterTestKit implements AutoCloseable { } } + private Optional maybeSetupJaasFile() throws Exception { + if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) { + File file = JaasUtils.writeJaasContextsToFile(Set.of( + new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME, + List.of( + JaasModule.plainLoginModule( + JaasUtils.KAFKA_PLAIN_ADMIN, + JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD, + true, + Map.of( + JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD, + JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD) + ) + ) + ) + )); + JaasUtils.refreshJavaLoginConfigParam(file); + return Optional.of(file); + } + return Optional.empty(); + } + public KafkaClusterTestKit build() throws Exception { Map controllers = new HashMap<>(); Map brokers = new HashMap<>(); Map jointServers = new HashMap<>(); File baseDirectory = null; - File jaasFile = null; - - if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) { - jaasFile = JaasUtils.writeJaasContextsToFile(Set.of( - new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME, - List.of( - JaasModule.plainLoginModule( - JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD, - true, - Map.of( - JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD, - JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD) - ) - ) - ) - )); - JaasUtils.refreshJavaLoginConfigParam(jaasFile); - } - + Optional jaasFile = maybeSetupJaasFile(); try { baseDirectory = new File(nodes.baseDirectory()); for (TestKitNode node : nodes.controllerNodes().values()) { socketFactoryManager.getOrCreatePortForListener(node.id(), controllerListenerName); } + for (TestKitNode node : nodes.brokerNodes().values()) { + socketFactoryManager.getOrCreatePortForListener(node.id(), brokerListenerName); + } for (TestKitNode node : nodes.controllerNodes().values()) { setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList()); KafkaConfig config = createNodeConfig(node); @@ -308,7 +315,7 @@ public class KafkaClusterTestKit implements AutoCloseable { baseDirectory, faultHandlerFactory, socketFactoryManager, - jaasFile == null ? Optional.empty() : Optional.of(jaasFile)); + jaasFile); } private String listeners(int node) { diff --git a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java index 27fd4a3e0a6..c013d155977 100644 --- a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java +++ b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java @@ -29,7 +29,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.config.ConfigResource; @@ -354,6 +356,28 @@ public class ClusterTestExtensionsTest { } } + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 1) + public void testBrokerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException { + final String topicName = "topic"; + try (Admin admin = cluster.admin(); + Producer producer = cluster.producer(Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()))) { + admin.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get(); + + cluster.waitForTopic(topicName, 1); + + cluster.brokers().values().forEach(broker -> { + broker.shutdown(); + broker.awaitShutdown(); + broker.startup(); + }); + + RecordMetadata recordMetadata0 = producer.send(new ProducerRecord<>(topicName, 0, "key 0", "value 0")).get(); + assertEquals(0, recordMetadata0.offset()); + } + } + @ClusterTest(types = {Type.KRAFT}) public void testControllerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException { try (Admin admin = cluster.admin()) {