KAFKA-18275 Restarting broker in testing should use the same port (#18381)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Peter Lee 2025-02-07 03:02:36 +08:00 committed by GitHub
parent 780640f383
commit 0621c0b4de
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 51 additions and 44 deletions

View File

@ -43,7 +43,6 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@ -57,8 +56,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
@ -247,8 +244,6 @@ public class ConnectWorkerIntegrationTest {
ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000));
useFixedBrokerPort();
// start the clusters
connect = connectBuilder.build();
connect.start();
@ -813,8 +808,6 @@ public class ConnectWorkerIntegrationTest {
workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
workerProps.put(METADATA_RECOVERY_STRATEGY_CONFIG, MetadataRecoveryStrategy.NONE.name);
useFixedBrokerPort();
connect = connectBuilder
.numWorkers(1)
.build();
@ -1431,23 +1424,6 @@ public class ConnectWorkerIntegrationTest {
return props;
}
private void useFixedBrokerPort() throws IOException {
// Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners
// config to get a random free port because in this test we want to stop the Kafka broker and then bring it
// back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka
// and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen
// on a different random free port the second time it is started. Note that we can only use the static port
// because we have a single broker setup in this test.
int listenerPort;
try (ServerSocket s = new ServerSocket(0)) {
listenerPort = s.getLocalPort();
}
brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", listenerPort));
connectBuilder
.numBrokers(1)
.brokerProps(brokerProps);
}
public static class EmptyTaskConfigsConnector extends SinkConnector {
@Override
public String version() {

View File

@ -206,35 +206,42 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
}
private Optional<File> maybeSetupJaasFile() throws Exception {
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
File file = JaasUtils.writeJaasContextsToFile(Set.of(
new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
List.of(
JaasModule.plainLoginModule(
JaasUtils.KAFKA_PLAIN_ADMIN,
JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
true,
Map.of(
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD,
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD)
)
)
)
));
JaasUtils.refreshJavaLoginConfigParam(file);
return Optional.of(file);
}
return Optional.empty();
}
public KafkaClusterTestKit build() throws Exception {
Map<Integer, ControllerServer> controllers = new HashMap<>();
Map<Integer, BrokerServer> brokers = new HashMap<>();
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
File jaasFile = null;
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
jaasFile = JaasUtils.writeJaasContextsToFile(Set.of(
new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
List.of(
JaasModule.plainLoginModule(
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
true,
Map.of(
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD,
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD)
)
)
)
));
JaasUtils.refreshJavaLoginConfigParam(jaasFile);
}
Optional<File> jaasFile = maybeSetupJaasFile();
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
socketFactoryManager.getOrCreatePortForListener(node.id(), controllerListenerName);
}
for (TestKitNode node : nodes.brokerNodes().values()) {
socketFactoryManager.getOrCreatePortForListener(node.id(), brokerListenerName);
}
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
KafkaConfig config = createNodeConfig(node);
@ -308,7 +315,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
baseDirectory,
faultHandlerFactory,
socketFactoryManager,
jaasFile == null ? Optional.empty() : Optional.of(jaasFile));
jaasFile);
}
private String listeners(int node) {

View File

@ -29,7 +29,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
@ -354,6 +356,28 @@ public class ClusterTestExtensionsTest {
}
}
@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 1)
public void testBrokerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException {
final String topicName = "topic";
try (Admin admin = cluster.admin();
Producer<String, String> producer = cluster.producer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()))) {
admin.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();
cluster.waitForTopic(topicName, 1);
cluster.brokers().values().forEach(broker -> {
broker.shutdown();
broker.awaitShutdown();
broker.startup();
});
RecordMetadata recordMetadata0 = producer.send(new ProducerRecord<>(topicName, 0, "key 0", "value 0")).get();
assertEquals(0, recordMetadata0.offset());
}
}
@ClusterTest(types = {Type.KRAFT})
public void testControllerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException {
try (Admin admin = cluster.admin()) {