From 61f61d62403e6b547a3a333895395ceff6d7afe5 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Mon, 29 Jul 2024 16:43:55 +0200 Subject: [PATCH] KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode (#16599) Reviewers: Omnia Ibrahim , Mickael Maison , Chia-Ping Tsai --- build.gradle | 5 +- checkstyle/import-control.xml | 3 + .../MirrorConnectorsIntegrationBaseTest.java | 93 +++-- .../MirrorConnectorsIntegrationSSLTest.java | 12 +- ...hCustomForwardingAdminIntegrationTest.java | 42 ++- .../integration/BlockingConnectorTest.java | 4 +- .../ConnectWorkerIntegrationTest.java | 50 ++- .../ConnectorClientPolicyIntegrationTest.java | 2 +- .../ConnectorRestartApiIntegrationTest.java | 9 +- .../ConnectorTopicsIntegrationTest.java | 4 +- .../ConnectorValidationIntegrationTest.java | 2 - .../ExactlyOnceSourceIntegrationTest.java | 93 ++++- .../ExampleConnectIntegrationTest.java | 4 +- .../InternalTopicsIntegrationTest.java | 2 +- .../OffsetsApiIntegrationTest.java | 1 - ...alanceSourceConnectorsIntegrationTest.java | 4 +- .../RestExtensionIntegrationTest.java | 4 +- .../SessionedProtocolIntegrationTest.java | 4 +- .../SinkConnectorsIntegrationTest.java | 4 +- .../SourceConnectorsIntegrationTest.java | 4 +- .../TransformationIntegrationTest.java | 4 +- .../util/clusters/EmbeddedConnect.java | 4 +- .../util/clusters/EmbeddedConnectCluster.java | 8 +- .../clusters/EmbeddedConnectStandalone.java | 2 +- .../util/clusters/EmbeddedKafkaCluster.java | 327 ++++++------------ .../kafka/testkit/KafkaClusterTestKit.java | 29 +- 26 files changed, 372 insertions(+), 348 deletions(-) diff --git a/build.gradle b/build.gradle index 79f61289bbd..7028c409f83 100644 --- a/build.gradle +++ b/build.gradle @@ -3193,13 +3193,14 @@ project(':connect:runtime') { testImplementation project(':core') testImplementation project(':server') testImplementation project(':metadata') + testImplementation project(':server-common') testImplementation project(':core').sourceSets.test.output testImplementation project(':server-common') testImplementation project(':server') testImplementation project(':group-coordinator') testImplementation project(':storage') testImplementation project(':connect:test-plugins') - testImplementation project(':group-coordinator') + testImplementation project(':server-common').sourceSets.test.output testImplementation libs.junitJupiter testImplementation libs.mockitoCore @@ -3317,6 +3318,7 @@ project(':connect:file') { testImplementation project(':connect:runtime').sourceSets.test.output testImplementation project(':core') testImplementation project(':core').sourceSets.test.output + testImplementation project(':server-common').sourceSets.test.output } javadoc { @@ -3418,6 +3420,7 @@ project(':connect:mirror') { testImplementation project(':core') testImplementation project(':core').sourceSets.test.output testImplementation project(':server') + testImplementation project(':server-common').sourceSets.test.output testRuntimeOnly project(':connect:runtime') testRuntimeOnly libs.slf4jReload4j diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c6ef445c044..08c45d0aa38 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -592,6 +592,7 @@ + @@ -612,6 +613,8 @@ + + diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java index 3b6f52ca191..80f8d2c3bda 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.config.TopicConfig; @@ -56,6 +56,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; import org.apache.kafka.connect.util.clusters.UngracefulShutdownException; +import org.apache.kafka.test.TestCondition; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -78,7 +79,6 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -86,6 +86,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongUnaryOperator; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX; @@ -301,12 +302,30 @@ public class MirrorConnectorsIntegrationBaseTest { MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS)); MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)); - // make sure the topic is auto-created in the other cluster - waitForTopicCreated(primary, reverseTopic1); - waitForTopicCreated(backup, backupTopic1); waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal"); - assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG), - "topic config was not synced"); + + TestCondition assertBackupTopicConfig = () -> { + String compactPolicy = getTopicConfig(backup.kafka(), backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG); + assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, compactPolicy, "topic config was not synced"); + return true; + }; + + if (replicateBackupToPrimary) { + // make sure the topics are auto-created in the other cluster + waitForTopicCreated(primary, reverseTopic1); + waitForTopicCreated(backup, backupTopic1); + assertBackupTopicConfig.conditionMet(); + } else { + // The backup and reverse topics are identical to the topics we created while setting up the test; + // we don't have to wait for them to be created, but there might be a small delay between + // now and when MM2 is able to sync the config for the backup topic + waitForCondition( + assertBackupTopicConfig, + 10_000, // Topic config sync interval is one second; this should be plenty of time + "topic config was not synced in time" + ); + } + createAndTestNewTopicWithConfigFilter(); assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(), @@ -429,12 +448,12 @@ public class MirrorConnectorsIntegrationBaseTest { try (Consumer primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) { waitForConsumingAllRecords(primaryConsumer, expectedRecords); } - + // one way replication from primary to backup mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); mm2Config = new MirrorMakerConfig(mm2Props); waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS); - + // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record Thread.sleep(TimeUnit.SECONDS.toMillis(3)); @@ -445,7 +464,7 @@ public class MirrorConnectorsIntegrationBaseTest { backupTopic)) { waitForConsumingAllRecords(backupConsumer, expectedRecords); } - + try (Admin backupClient = backup.kafka().createAdminClient()) { // retrieve the consumer group offset from backup cluster Map remoteOffsets = @@ -1189,14 +1208,11 @@ public class MirrorConnectorsIntegrationBaseTest { * @param records Records to send in one parallel batch */ protected void produceMessages(Producer producer, List> records) { - List> futures = new ArrayList<>(); - for (ProducerRecord record : records) { - futures.add(producer.send(record)); - } Timer timer = Time.SYSTEM.timer(RECORD_PRODUCE_DURATION_MS); + try { - for (Future future : futures) { - future.get(timer.remainingMs(), TimeUnit.MILLISECONDS); + for (ProducerRecord record : records) { + producer.send(record).get(timer.remainingMs(), TimeUnit.MILLISECONDS); timer.update(); } } catch (ExecutionException | InterruptedException | TimeoutException e) { @@ -1397,13 +1413,46 @@ public class MirrorConnectorsIntegrationBaseTest { /* * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly */ - protected void warmUpConsumer(Map consumerProps) { - try (Consumer dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { - dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); - dummyConsumer.commitSync(); - } - try (Consumer dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) { + protected final void warmUpConsumer(Map consumerProps) { + final String topic = "test-topic-1"; + warmUpConsumer("primary", primary.kafka(), consumerProps, topic); + warmUpConsumer("backup", backup.kafka(), consumerProps, topic); + } + + private void warmUpConsumer(String clusterName, EmbeddedKafkaCluster kafkaCluster, Map consumerProps, String topic) { + try (Consumer dummyConsumer = kafkaCluster.createConsumerAndSubscribeTo(consumerProps, topic)) { + // poll to ensure we've joined the group dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + + // force the consumer to have a known position on every topic partition + // so that it will be able to commit offsets for that position + // (it's possible that poll returns before that has happened) + Set topicPartitionsPendingPosition = IntStream.range(0, NUM_PARTITIONS) + .mapToObj(partition -> new TopicPartition(topic, partition)) + .collect(Collectors.toSet()); + Timer positionTimer = Time.SYSTEM.timer(60_000); + while (!positionTimer.isExpired() && !topicPartitionsPendingPosition.isEmpty()) { + Set topicPartitionsWithPosition = new HashSet<>(); + + topicPartitionsPendingPosition.forEach(topicPartition -> { + try { + positionTimer.update(); + dummyConsumer.position(topicPartition, Duration.ofMillis(positionTimer.remainingMs())); + topicPartitionsWithPosition.add(topicPartition); + } catch (KafkaException e) { + log.warn("Failed to calculate consumer position for {} on cluster {}", topicPartition, clusterName); + } + }); + + topicPartitionsPendingPosition.removeAll(topicPartitionsWithPosition); + } + assertEquals( + Collections.emptySet(), + topicPartitionsPendingPosition, + "Failed to calculate consumer position for one or more partitions on cluster " + clusterName + " in time" + ); + + // And finally, commit offsets dummyConsumer.commitSync(); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java index 7999f5f9a4a..96a71710d0d 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.ConnectionMode; import org.apache.kafka.network.SocketServerConfigs; -import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.test.TestSslUtils; import org.apache.kafka.test.TestUtils; @@ -42,25 +41,24 @@ public class MirrorConnectorsIntegrationSSLTest extends MirrorConnectorsIntegrat public void startClusters() throws Exception { Map sslConfig = TestSslUtils.createSslConfig(false, true, ConnectionMode.SERVER, TestUtils.tempFile(), "testCert"); // enable SSL on backup kafka broker - backupBrokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:0"); - backupBrokerProps.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL"); + backupBrokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "EXTERNAL:SSL,CONTROLLER:SSL"); backupBrokerProps.putAll(sslConfig); - + Properties sslProps = new Properties(); sslProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); sslProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); sslProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); - + // set SSL config for kafka connect worker backupWorkerProps.putAll(sslProps.entrySet().stream().collect(Collectors.toMap( e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue())))); - + mm2Props.putAll(sslProps.entrySet().stream().collect(Collectors.toMap( e -> BACKUP_CLUSTER_ALIAS + "." + e.getKey(), e -> String.valueOf(e.getValue())))); // set SSL config for producer used by source task in MM2 mm2Props.putAll(sslProps.entrySet().stream().collect(Collectors.toMap( e -> BACKUP_CLUSTER_ALIAS + ".producer." + e.getKey(), e -> String.valueOf(e.getValue())))); - + super.startClusters(); } } diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java index d94ce632ae1..853cd02f134 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; @@ -33,6 +34,9 @@ import org.apache.kafka.connect.mirror.MirrorMakerConfig; import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata; import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.server.config.KRaftConfigs; +import org.apache.kafka.server.config.ServerConfigs; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -62,23 +66,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ @Tag("integration") public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends MirrorConnectorsIntegrationBaseTest { + + private static final int TOPIC_ACL_SYNC_DURATION_MS = 30_000; private static final int FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS = 60_000; /* * enable ACL on brokers. */ protected static void enableAclAuthorizer(Properties brokerProps) { - brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer"); - brokerProps.put("sasl.enabled.mechanisms", "PLAIN"); - brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN"); - brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); - brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0"); - brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required " - + "username=\"super\" " - + "password=\"super_pwd\" " - + "user_connector=\"connector_pwd\" " - + "user_super=\"super_pwd\";"); + brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT"); + brokerProps.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, "org.apache.kafka.metadata.authorizer.StandardAuthorizer"); + brokerProps.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN"); + brokerProps.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN"); + brokerProps.put(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN"); + String listenerSaslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"super\" " + + "password=\"super_pwd\" " + + "user_connector=\"connector_pwd\" " + + "user_super=\"super_pwd\";"; + brokerProps.put("listener.name.external.plain.sasl.jaas.config", listenerSaslJaasConfig); + brokerProps.put("listener.name.controller.plain.sasl.jaas.config", listenerSaslJaasConfig); brokerProps.put("super.users", "User:super"); } @@ -293,6 +300,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi @Test public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception { mm2Props.put("sync.topic.acls.enabled", "true"); + mm2Props.put("sync.topic.acls.interval.seconds", "1"); mm2Config = new MirrorMakerConfig(mm2Props); List aclBindings = Collections.singletonList( new AclBinding( @@ -324,8 +332,16 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW) ); - assertTrue(getAclBindings(backup.kafka(), "primary.test-topic-1").contains(expectedACLOnBackupCluster), "topic ACLs was synced"); - assertTrue(getAclBindings(primary.kafka(), "backup.test-topic-1").contains(expectedACLOnPrimaryCluster), "topic ACLs was synced"); + // In some rare cases replica topics are created before ACLs are synced, so retry logic is necessary + waitForCondition( + () -> { + assertTrue(getAclBindings(backup.kafka(), "primary.test-topic-1").contains(expectedACLOnBackupCluster), "topic ACLs are not synced on backup cluster"); + assertTrue(getAclBindings(primary.kafka(), "backup.test-topic-1").contains(expectedACLOnPrimaryCluster), "topic ACLs are not synced on primary cluster"); + return true; + }, + TOPIC_ACL_SYNC_DURATION_MS, + "Topic ACLs were not synced in time" + ); // expect to use FakeForwardingAdminWithLocalMetadata to update topic ACLs in FakeLocalMetadataStore.allAcls assertTrue(FakeLocalMetadataStore.aclBindings("dummy").containsAll(Arrays.asList(expectedACLOnBackupCluster, expectedACLOnPrimaryCluster))); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index ab5cf207fe8..90a0e96a78a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -123,7 +123,7 @@ public class BlockingConnectorTest { @BeforeEach public void setup() throws Exception { - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -138,7 +138,7 @@ public class BlockingConnectorTest { @AfterEach public void close() { - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); // unblock everything so that we don't leak threads after each test run Block.reset(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 96753b2100e..516d5a3f2cd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -43,6 +43,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; +import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -57,6 +58,8 @@ import org.slf4j.event.Level; import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; +import java.net.ServerSocket; import java.nio.file.Path; import java.util.Collection; import java.util.Collections; @@ -118,6 +121,7 @@ public class ConnectWorkerIntegrationTest { private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class); private static final int NUM_TOPIC_PARTITIONS = 3; + private static final long RECORD_TRANSFER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60); private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30); private static final int NUM_WORKERS = 3; private static final int NUM_TASKS = 4; @@ -142,7 +146,7 @@ public class ConnectWorkerIntegrationTest { brokerProps = new Properties(); brokerProps.put("auto.create.topics.enable", String.valueOf(false)); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connectBuilder = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -154,7 +158,7 @@ public class ConnectWorkerIntegrationTest { @AfterEach public void close(TestInfo testInfo) { log.info("Finished test {}", testInfo.getDisplayName()); - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } @@ -244,8 +248,11 @@ public class ConnectWorkerIntegrationTest { public void testBrokerCoordinator() throws Exception { ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000)); - connect = connectBuilder.workerProps(workerProps).build(); + + useFixedBrokerPort(); + // start the clusters + connect = connectBuilder.build(); connect.start(); int numTasks = 4; // create test topic @@ -263,7 +270,7 @@ public class ConnectWorkerIntegrationTest { // expect that the connector will be stopped once the coordinator is detected to be down StartAndStopLatch stopLatch = connectorHandle.expectedStops(1, false); - connect.kafka().stopOnlyKafka(); + connect.kafka().stopOnlyBrokers(); // Allow for the workers to discover that the coordinator is unavailable, wait is // heartbeat timeout * 2 + 4sec @@ -294,7 +301,7 @@ public class ConnectWorkerIntegrationTest { + CONNECTOR_SETUP_DURATION_MS + "ms"); StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, false); - connect.kafka().startOnlyKafkaOnSamePorts(); + connect.kafka().restartOnlyBrokers(); // Allow for the kafka brokers to come back online Thread.sleep(TimeUnit.SECONDS.toMillis(10)); @@ -835,8 +842,10 @@ public class ConnectWorkerIntegrationTest { // Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to // be spuriously triggered after the group coordinator for a Connect cluster is bounced workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0"); + + useFixedBrokerPort(); + connect = connectBuilder - .numBrokers(1) .numWorkers(1) .build(); connect.start(); @@ -854,7 +863,7 @@ public class ConnectWorkerIntegrationTest { // Bring down Kafka, which should cause some REST requests to fail log.info("Stopping Kafka cluster"); - connect.kafka().stopOnlyKafka(); + connect.kafka().stopOnlyBrokers(); // Try to reconfigure the connector, which should fail with a timeout error log.info("Trying to reconfigure connector while Kafka cluster is down"); @@ -863,7 +872,7 @@ public class ConnectWorkerIntegrationTest { "flushing updates to the status topic" ); log.info("Restarting Kafka cluster"); - connect.kafka().startOnlyKafkaOnSamePorts(); + connect.kafka().restartOnlyBrokers(); connect.assertions().assertExactlyNumBrokersAreUp(1, "Broker did not complete startup in time"); log.info("Kafka cluster is restarted"); @@ -1182,7 +1191,7 @@ public class ConnectWorkerIntegrationTest { NUM_TASKS, "Connector or its tasks did not start in time" ); - connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); + connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS); connect.deleteConnector(CONNECTOR_NAME); @@ -1223,7 +1232,7 @@ public class ConnectWorkerIntegrationTest { NUM_TASKS, "Connector or its tasks did not start in time" ); - connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); + connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS); // See if any new records got written to the old topic final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition); @@ -1282,7 +1291,7 @@ public class ConnectWorkerIntegrationTest { NUM_TASKS, "Connector or its tasks did not start in time" ); - connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); + connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS); // Delete the secrets file, which should render the old task configs invalid assertTrue(secretsFile.delete(), "Failed to delete secrets file"); @@ -1307,7 +1316,7 @@ public class ConnectWorkerIntegrationTest { // Wait for at least one task to commit offsets after being restarted connectorHandle.expectedCommits(1); - connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); + connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS); final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0)); assertTrue( @@ -1446,6 +1455,23 @@ public class ConnectWorkerIntegrationTest { return props; } + private void useFixedBrokerPort() throws IOException { + // Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners + // config to get a random free port because in this test we want to stop the Kafka broker and then bring it + // back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka + // and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen + // on a different random free port the second time it is started. Note that we can only use the static port + // because we have a single broker setup in this test. + int listenerPort; + try (ServerSocket s = new ServerSocket(0)) { + listenerPort = s.getLocalPort(); + } + brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", listenerPort)); + connectBuilder + .numBrokers(1) + .brokerProps(brokerProps); + } + public static class EmptyTaskConfigsConnector extends SinkConnector { @Override public String version() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java index 03cd000a19f..a127f85b12d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java @@ -110,7 +110,7 @@ public class ConnectorClientPolicyIntegrationTest { Properties exampleBrokerProps = new Properties(); exampleBrokerProps.put("auto.create.topics.enable", "false"); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java index c0c64c505dc..1c398a22396 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java @@ -119,7 +119,6 @@ public class ConnectorRestartApiIntegrationTest { @AfterAll public static void close() { - // stop all Connect, Kafka and Zk threads. CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); } @@ -127,7 +126,7 @@ public class ConnectorRestartApiIntegrationTest { public void testRestartUnknownConnectorNoParams() throws Exception { String connectorName = "Unknown"; - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster startOrReuseConnectWithNumWorkers(ONE_WORKER); // Call the Restart API String restartEndpoint = connect.endpointForResource( @@ -148,7 +147,7 @@ public class ConnectorRestartApiIntegrationTest { private void restartUnknownConnector(boolean onlyFailed, boolean includeTasks) throws Exception { String connectorName = "Unknown"; - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster startOrReuseConnectWithNumWorkers(ONE_WORKER); // Call the Restart API String restartEndpoint = connect.endpointForResource( @@ -299,7 +298,7 @@ public class ConnectorRestartApiIntegrationTest { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); props.put("connector.start.inject.error", "true"); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster startOrReuseConnectWithNumWorkers(ONE_WORKER); // Try to start the connector and its single task. @@ -330,7 +329,7 @@ public class ConnectorRestartApiIntegrationTest { // setup up props for the source connector Map props = defaultSourceConnectorProps(TOPIC_NAME); tasksToFail.forEach(taskId -> props.put("task-" + taskId + ".start.inject.error", "true")); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster startOrReuseConnectWithNumWorkers(ONE_WORKER); // Try to start the connector and its single task. diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java index cdc0e3f3d66..fb4bbcdf408 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java @@ -90,7 +90,7 @@ public class ConnectorTopicsIntegrationTest { // setup Kafka broker properties brokerProps.put("auto.create.topics.enable", String.valueOf(false)); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connectBuilder = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -101,7 +101,7 @@ public class ConnectorTopicsIntegrationTest { @AfterEach public void close() { - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java index a82238600cb..2805504e360 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java @@ -80,7 +80,6 @@ public class ConnectorValidationIntegrationTest { TestPlugins.pluginPathJoined(testPlugins) ); - // build a Connect cluster backed by Kafka and Zk connect = new EmbeddedConnectCluster.Builder() .name("connector-validation-connect-cluster") .workerProps(workerProps) @@ -93,7 +92,6 @@ public class ConnectorValidationIntegrationTest { @AfterAll public static void close() { if (connect != null) { - // stop all Connect, Kafka and Zk threads. Utils.closeQuietly(connect::stop, "Embedded Connect cluster"); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java index 55a865d43cb..6bb12e8f178 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; @@ -42,6 +43,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; @@ -50,6 +52,10 @@ import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.clusters.ConnectAssertions; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.server.config.KRaftConfigs; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.test.NoRetryException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -116,6 +122,7 @@ public class ExactlyOnceSourceIntegrationTest { private static final int CONSUME_RECORDS_TIMEOUT_MS = 60_000; private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30_000; + private static final int ACL_PROPAGATION_TIMEOUT_MS = 30_000; private static final int DEFAULT_NUM_WORKERS = 3; // Tests require that a minimum but not unreasonably large number of records are sourced. @@ -140,7 +147,7 @@ public class ExactlyOnceSourceIntegrationTest { brokerProps.put("transaction.state.log.replication.factor", "1"); brokerProps.put("transaction.state.log.min.isr", "1"); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by Kafka connectBuilder = new EmbeddedConnectCluster.Builder() .numWorkers(DEFAULT_NUM_WORKERS) .numBrokers(1) @@ -159,7 +166,7 @@ public class ExactlyOnceSourceIntegrationTest { @AfterEach public void close() { try { - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } finally { // Clear the handle for the connector. Fun fact: if you don't do this, your tests become quite flaky. @@ -624,17 +631,18 @@ public class ExactlyOnceSourceIntegrationTest { */ @Test public void testTasksFailOnInabilityToFence() throws Exception { - brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer"); - brokerProps.put("sasl.enabled.mechanisms", "PLAIN"); - brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN"); - brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT"); - brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0"); - brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config", - "org.apache.kafka.common.security.plain.PlainLoginModule required " - + "username=\"super\" " - + "password=\"super_pwd\" " - + "user_connector=\"connector_pwd\" " - + "user_super=\"super_pwd\";"); + brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT"); + brokerProps.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, "org.apache.kafka.metadata.authorizer.StandardAuthorizer"); + brokerProps.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN"); + brokerProps.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN"); + brokerProps.put(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN"); + String listenerSaslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"super\" " + + "password=\"super_pwd\" " + + "user_connector=\"connector_pwd\" " + + "user_super=\"super_pwd\";"; + brokerProps.put("listener.name.external.plain.sasl.jaas.config", listenerSaslJaasConfig); + brokerProps.put("listener.name.controller.plain.sasl.jaas.config", listenerSaslJaasConfig); brokerProps.put("super.users", "User:super"); Map superUserClientConfig = new HashMap<>(); @@ -694,12 +702,30 @@ public class ExactlyOnceSourceIntegrationTest { )).all().get(); } - StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax); - log.info("Bringing up connector with fresh slate; fencing should not be necessary"); connect.configureConnector(CONNECTOR_NAME, props); - assertConnectorStarted(connectorStart); - // Verify that the connector and its tasks have been able to start successfully + + // Hack: There is a small chance that our recent ACL updates for the connector have + // not yet been propagated across the entire Kafka cluster, and that our connector + // will fail on startup when it tries to list the end offsets of the worker's offsets topic + // So, we implement some retry logic here to add a layer of resiliency in that case + waitForCondition( + () -> { + ConnectorStateInfo status = connect.connectorStatus(CONNECTOR_NAME); + if ("RUNNING".equals(status.connector().state())) { + return true; + } else if ("FAILED".equals(status.connector().state())) { + log.debug("Restarting failed connector {}", CONNECTOR_NAME); + connect.restartConnector(CONNECTOR_NAME); + } + return false; + }, + ACL_PROPAGATION_TIMEOUT_MS, + "Connector was not able to start in time, " + + "or ACL updates were not propagated across the Kafka cluster soon enough" + ); + + // Also verify that the connector's tasks have been able to start successfully connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have started successfully"); log.info("Reconfiguring connector; fencing should be necessary, and tasks should fail to start"); @@ -725,8 +751,39 @@ public class ExactlyOnceSourceIntegrationTest { log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time"); connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false); + // Verify that the connector and its tasks have been able to restart successfully - connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have restarted successfully"); + // Use the same retry logic as above, in case there is a delay in the propagation of our ACL updates + waitForCondition( + () -> { + ConnectorStateInfo status = connect.connectorStatus(CONNECTOR_NAME); + boolean connectorRunning = "RUNNING".equals(status.connector().state()); + boolean allTasksRunning = status.tasks().stream() + .allMatch(t -> "RUNNING".equals(t.state())); + boolean expectedNumTasks = status.tasks().size() == tasksMax; + if (connectorRunning && allTasksRunning && expectedNumTasks) { + return true; + } else { + if (!connectorRunning) { + if ("FAILED".equals(status.connector().state())) { + // Only task failures are expected ;if the connector has failed, something + // else is wrong and we should fail the test immediately + throw new NoRetryException( + new AssertionError("Connector " + CONNECTOR_NAME + " has failed unexpectedly") + ); + } + } + // Restart all failed tasks + status.tasks().stream() + .filter(t -> "FAILED".equals(t.state())) + .map(ConnectorStateInfo.TaskState::id) + .forEach(t -> connect.restartTask(CONNECTOR_NAME, t)); + return false; + } + }, + ConnectAssertions.CONNECTOR_SETUP_DURATION_MS, + "Connector and task should have restarted successfully" + ); } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index 0dbea6da2be..d131fd4efc6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -79,7 +79,7 @@ public class ExampleConnectIntegrationTest { Properties exampleBrokerProps = new Properties(); exampleBrokerProps.put("auto.create.topics.enable", "false"); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -100,7 +100,7 @@ public class ExampleConnectIntegrationTest { // delete connector handle RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java index ed7a786e1d6..a625dc983e8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java @@ -64,7 +64,7 @@ public class InternalTopicsIntegrationTest { @AfterEach public void close() { - // stop all Connect, Kafka and Zk threads. + // stop the Connect workers and Kafka brokers. connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index 4571e584502..0ac514039f0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -120,7 +120,6 @@ public class OffsetsApiIntegrationTest { @AfterAll public static void close() { - // stop all Connect, Kafka and Zk threads. CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop); // wait for all blocked threads created while testing zombie task scenarios to finish BlockingConnectorTest.Block.join(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java index 178697a14d0..ff028928c25 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java @@ -87,7 +87,7 @@ public class RebalanceSourceConnectorsIntegrationTest { Properties brokerProps = new Properties(); brokerProps.put("auto.create.topics.enable", "false"); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -103,7 +103,7 @@ public class RebalanceSourceConnectorsIntegrationTest { @AfterEach public void close(TestInfo testInfo) { log.info("Finished test {}", testInfo.getDisplayName()); - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index b0e1cae1fae..8ccc31baa86 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -67,7 +67,7 @@ public class RestExtensionIntegrationTest { Map workerProps = new HashMap<>(); workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName()); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -135,7 +135,7 @@ public class RestExtensionIntegrationTest { @AfterEach public void close() { - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); IntegrationTestRestExtension.instance = null; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java index a482c352709..7969471918e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java @@ -64,7 +64,7 @@ public class SessionedProtocolIntegrationTest { Map workerProps = new HashMap<>(); workerProps.put(CONNECT_PROTOCOL_CONFIG, ConnectProtocolCompatibility.SESSIONED.protocol()); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(2) @@ -81,7 +81,7 @@ public class SessionedProtocolIntegrationTest { @AfterEach public void close() { - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java index c67d0ca30f1..961eeb70f99 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SinkConnectorsIntegrationTest.java @@ -75,7 +75,7 @@ public class SinkConnectorsIntegrationTest { brokerProps.put("auto.create.topics.enable", "false"); brokerProps.put("delete.topic.enable", "true"); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -90,7 +90,7 @@ public class SinkConnectorsIntegrationTest { // delete connector handle RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java index 91416cb8441..aa1dc6bcf94 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SourceConnectorsIntegrationTest.java @@ -80,7 +80,7 @@ public class SourceConnectorsIntegrationTest { // setup Kafka broker properties brokerProps.put("auto.create.topics.enable", String.valueOf(false)); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connectBuilder = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -91,7 +91,7 @@ public class SourceConnectorsIntegrationTest { @AfterEach public void close() { - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java index 5c8c2f5630f..105d238d56f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TransformationIntegrationTest.java @@ -84,7 +84,7 @@ public class TransformationIntegrationTest { // This is required because tests in this class also test per-connector topic creation with transformations brokerProps.put("auto.create.topics.enable", "false"); - // build a Connect cluster backed by Kafka and Zk + // build a Connect cluster backed by a Kafka KRaft cluster connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") .numWorkers(NUM_WORKERS) @@ -105,7 +105,7 @@ public class TransformationIntegrationTest { // delete connector handle RuntimeHandles.get().deleteConnector(CONNECTOR_NAME); - // stop all Connect, Kafka and Zk threads. + // stop the Connect cluster and its backing Kafka cluster. connect.stop(); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java index 38f61b06b2e..018c9a40b05 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java @@ -121,7 +121,7 @@ abstract class EmbeddedConnect { }; /** - * Start the Connect cluster and the embedded Kafka and Zookeeper cluster, + * Start the Connect cluster and the embedded Kafka KRaft cluster, * and wait for the Kafka and Connect clusters to become healthy. */ public void start() { @@ -163,7 +163,7 @@ abstract class EmbeddedConnect { } /** - * Stop the connect cluster and the embedded Kafka and Zookeeper cluster. + * Stop the Connect cluster and the embedded Kafka KRaft cluster. * Clean up any temp directories created locally. * * @throws RuntimeException if Kafka brokers fail to stop diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index 8272c294bd3..eab33ef2678 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -41,9 +41,11 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STA import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG; /** - * Start an embedded connect cluster. Internally, this class will spin up a Kafka and Zk cluster, set up any tmp - * directories, and clean them up on exit. Methods on the same {@code EmbeddedConnectCluster} are - * not guaranteed to be thread-safe. + * Start an embedded Connect cluster that can be used for integration tests. Internally, this class also spins up a + * backing Kafka KRaft cluster for the Connect cluster leveraging {@link kafka.testkit.KafkaClusterTestKit}. Methods + * on the same {@code EmbeddedConnectCluster} are not guaranteed to be thread-safe. This class also provides various + * utility methods to perform actions on the Connect cluster such as connector creation, config validation, connector + * restarts, pause / resume, connector deletion etc. */ public class EmbeddedConnectCluster extends EmbeddedConnect { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java index 3cbbca5ebdf..66ce78d0d1b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java @@ -47,7 +47,7 @@ import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_C import static org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG; /** - * Start a standalone embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, + * Start a standalone embedded connect worker. Internally, this class will spin up a Kafka cluster, * set up any tmp directories. and clean them up on exit. Methods on the same * {@code EmbeddedConnectStandalone} are not guaranteed to be thread-safe. */ diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java index e02c69fc43c..37d130bbf8b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java @@ -16,12 +16,9 @@ */ package org.apache.kafka.connect.util.clusters; -import kafka.cluster.EndPoint; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.CoreUtils; -import kafka.utils.TestUtils; -import kafka.zk.EmbeddedZookeeper; +import kafka.server.BrokerServer; +import kafka.testkit.KafkaClusterTestKit; +import kafka.testkit.TestKitNodes; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; @@ -49,23 +46,17 @@ import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.errors.InvalidReplicationFactorException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; -import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import org.apache.kafka.metadata.BrokerState; import org.apache.kafka.network.SocketServerConfigs; -import org.apache.kafka.server.config.ServerConfigs; -import org.apache.kafka.server.config.ZkConfigs; -import org.apache.kafka.storage.internals.log.CleanerConfig; +import org.apache.kafka.server.config.ServerLogConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -83,6 +74,7 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -95,98 +87,62 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG; -import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; -import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** - * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for - * integration tests. + * Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the + * specified number of brokers and the specified broker properties. This can be used for integration tests and is + * typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be + * supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data, + * consume data etc. */ public class EmbeddedKafkaCluster { private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); - private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); + private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120); - // Kafka Config - private final KafkaServer[] brokers; + private final KafkaClusterTestKit cluster; private final Properties brokerConfig; - private final Time time = Time.SYSTEM; - private final int[] currentBrokerPorts; - private final String[] currentBrokerLogDirs; - private final boolean hasListenerConfig; + private final Map clientConfigs; - final Map clientConfigs; - - private EmbeddedZookeeper zookeeper = null; - private ListenerName listenerName = new ListenerName("PLAINTEXT"); private KafkaProducer producer; - public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig) { + public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) { this(numBrokers, brokerConfig, Collections.emptyMap()); } public EmbeddedKafkaCluster(final int numBrokers, - final Properties brokerConfig, - final Map clientConfigs) { - brokers = new KafkaServer[numBrokers]; - currentBrokerPorts = new int[numBrokers]; - currentBrokerLogDirs = new String[numBrokers]; - this.brokerConfig = brokerConfig; - // Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether - // a listener config is defined during initialization in order to know if it's - // safe to override it - hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null; + final Properties brokerConfig, + final Map clientConfigs) { + addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers); + try { + KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder() + .setCombined(true) + .setNumBrokerNodes(numBrokers) + // Reduce number of controllers for faster startup + // We may make this configurable in the future if there's a use case for it + .setNumControllerNodes(1) + .build() + ); + brokerConfig.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, v)); + cluster = clusterBuilder.build(); + cluster.nonFatalFaultHandler().setIgnore(true); + } catch (Exception e) { + throw new ConnectException("Failed to create test Kafka cluster", e); + } + this.brokerConfig = brokerConfig; this.clientConfigs = clientConfigs; } - /** - * Starts the Kafka cluster alone using the ports that were assigned during initialization of - * the harness. - * - * @throws ConnectException if a directory to store the data cannot be created - */ - public void startOnlyKafkaOnSamePorts() { - doStart(); - } - public void start() { - // pick a random port - zookeeper = new EmbeddedZookeeper(); - Arrays.fill(currentBrokerPorts, 0); - Arrays.fill(currentBrokerLogDirs, null); - doStart(); - } - - private void doStart() { - brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString()); - - putIfAbsent(brokerConfig, ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, true); - putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0); - putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) brokers.length); - putIfAbsent(brokerConfig, AUTO_CREATE_TOPICS_ENABLE_CONFIG, false); - // reduce the size of the log cleaner map to reduce test memory usage - putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L); - - Object listenerConfig = brokerConfig.get(INTER_BROKER_LISTENER_NAME_CONFIG); - if (listenerConfig == null) - listenerConfig = brokerConfig.get(INTER_BROKER_SECURITY_PROTOCOL_CONFIG); - if (listenerConfig == null) - listenerConfig = "PLAINTEXT"; - listenerName = new ListenerName(listenerConfig.toString()); - - for (int i = 0; i < brokers.length; i++) { - brokerConfig.put(ServerConfigs.BROKER_ID_CONFIG, i); - currentBrokerLogDirs[i] = currentBrokerLogDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i]; - brokerConfig.put(LOG_DIR_CONFIG, currentBrokerLogDirs[i]); - if (!hasListenerConfig) - brokerConfig.put(SocketServerConfigs.LISTENERS_CONFIG, listenerName.value() + "://localhost:" + currentBrokerPorts[i]); - brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, true), time); - currentBrokerPorts[i] = brokers[i].boundPort(listenerName); + try { + cluster.format(); + cluster.startup(); + cluster.waitForReadyBrokers(); + } catch (Exception e) { + throw new ConnectException("Failed to start test Kafka cluster", e); } Map producerProps = new HashMap<>(clientConfigs); @@ -199,149 +155,65 @@ public class EmbeddedKafkaCluster { producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer()); } - public void stopOnlyKafka() { - stop(false, false); + /** + * Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers + * need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the + * {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use a fixed non-zero free port. Also note that this is + * only possible when {@code numBrokers} is 1. + */ + public void restartOnlyBrokers() { + cluster.brokers().values().forEach(BrokerServer::startup); + } + + /** + * Stop only the Kafka brokers (and not the KRaft controllers). This can be used to test Connect's functionality + * when the backing Kafka cluster goes offline. + */ + public void stopOnlyBrokers() { + cluster.brokers().values().forEach(BrokerServer::shutdown); + cluster.brokers().values().forEach(BrokerServer::awaitShutdown); } public void stop() { - stop(true, true); - } - - private void stop(boolean deleteLogDirs, boolean stopZK) { - maybeShutDownProducer(); - triggerBrokerShutdown(); - awaitBrokerShutdown(); - - if (deleteLogDirs) - deleteLogDirs(); - - if (stopZK) - stopZK(); - } - - private void maybeShutDownProducer() { - try { - if (producer != null) { - producer.close(); - } - } catch (Exception e) { - log.error("Could not shutdown producer ", e); - throw new RuntimeException("Could not shutdown producer", e); - } - } - - private void triggerBrokerShutdown() { - for (KafkaServer broker : brokers) { - try { - broker.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - private void awaitBrokerShutdown() { - for (KafkaServer broker : brokers) { - try { - broker.awaitShutdown(); - } catch (Throwable t) { - String msg = String.format("Failed while awaiting shutdown of broker at %s", address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - private void deleteLogDirs() { - for (KafkaServer broker : brokers) { - try { - log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs()); - CoreUtils.delete(broker.config().logDirs()); - } catch (Throwable t) { - String msg = String.format("Could not clean up log dirs for broker at %s", - address(broker)); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - } - - private void stopZK() { - try { - zookeeper.shutdown(); - } catch (Throwable t) { - String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString()); - log.error(msg, t); - throw new RuntimeException(msg, t); - } - } - - private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) { - props.put(propertyKey, propertyValue); - } - } - - private String createLogDir() { - try { - return Files.createTempDirectory(getClass().getSimpleName()).toString(); - } catch (IOException e) { - log.error("Unable to create temporary log directory", e); - throw new ConnectException("Unable to create temporary log directory", e); + AtomicReference shutdownFailure = new AtomicReference<>(); + Utils.closeQuietly(producer, "producer for embedded Kafka cluster", shutdownFailure); + Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure); + if (shutdownFailure.get() != null) { + throw new ConnectException("Failed to shut down producer / embedded Kafka cluster", shutdownFailure.get()); } } public String bootstrapServers() { - return Arrays.stream(brokers) - .map(this::address) - .collect(Collectors.joining(",")); - } - - public String address(KafkaServer server) { - final EndPoint endPoint = server.advertisedListeners().head(); - return endPoint.host() + ":" + endPoint.port(); - } - - public String zKConnectString() { - return "127.0.0.1:" + zookeeper.port(); + return cluster.bootstrapServers(); } /** * Get the brokers that have a {@link BrokerState#RUNNING} state. * - * @return the list of {@link KafkaServer} instances that are running; - * never null but possibly empty + * @return the set of {@link BrokerServer} instances that are running; + * never null but possibly empty */ - public Set runningBrokers() { - return brokersInState(state -> state == BrokerState.RUNNING); + public Set runningBrokers() { + return brokersInState(BrokerState.RUNNING::equals); } /** * Get the brokers whose state match the given predicate. * - * @return the list of {@link KafkaServer} instances with states that match the predicate; - * never null but possibly empty + * @return the set of {@link BrokerServer} instances with states that match the predicate; + * never null but possibly empty */ - public Set brokersInState(Predicate desiredState) { - return Arrays.stream(brokers) - .filter(b -> hasState(b, desiredState)) - .collect(Collectors.toSet()); + public Set brokersInState(Predicate desiredState) { + return cluster.brokers().values().stream() + .filter(b -> desiredState.test(b.brokerState())) + .collect(Collectors.toSet()); } - protected boolean hasState(KafkaServer server, Predicate desiredState) { - try { - return desiredState.test(server.brokerState()); - } catch (Throwable e) { - // Broker failed to respond. - return false; - } - } - public boolean sslEnabled() { - final String listeners = brokerConfig.getProperty(SocketServerConfigs.LISTENERS_CONFIG); - return listeners != null && listeners.contains("SSL"); + final String listenerSecurityProtocolMap = brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG); + if (listenerSecurityProtocolMap == null) + return false; + return listenerSecurityProtocolMap.contains(":SSL") || listenerSecurityProtocolMap.contains(":SASL_SSL"); } /** @@ -447,9 +319,9 @@ public class EmbeddedKafkaCluster { * @param adminClientConfig Additional admin client configuration settings. */ public void createTopic(String topic, int partitions, int replication, Map topicConfig, Map adminClientConfig) { - if (replication > brokers.length) { + if (replication > cluster.brokers().size()) { throw new InvalidReplicationFactorException("Insufficient brokers (" - + brokers.length + ") for desired replication (" + replication + ")"); + + cluster.brokers().size() + ") for desired replication (" + replication + ")"); } log.info("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", @@ -498,8 +370,7 @@ public class EmbeddedKafkaCluster { Properties props = Utils.mkProperties(clientConfigs); props.putAll(adminClientConfig); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - final Object listeners = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG); - if (listeners != null && listeners.toString().contains("SSL")) { + if (sslEnabled()) { props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value()); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); @@ -702,16 +573,16 @@ public class EmbeddedKafkaCluster { Map props = new HashMap<>(clientConfigs); props.putAll(consumerProps); - putIfAbsent(props, GROUP_ID_CONFIG, UUID.randomUUID().toString()); - putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - putIfAbsent(props, ENABLE_AUTO_COMMIT_CONFIG, "false"); - putIfAbsent(props, AUTO_OFFSET_RESET_CONFIG, "earliest"); - putIfAbsent(props, KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - putIfAbsent(props, VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.putIfAbsent(GROUP_ID_CONFIG, UUID.randomUUID().toString()); + props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + props.putIfAbsent(ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.putIfAbsent(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.putIfAbsent(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); if (sslEnabled()) { - putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); - putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); - putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); } KafkaConsumer consumer; try { @@ -731,13 +602,13 @@ public class EmbeddedKafkaCluster { public KafkaProducer createProducer(Map producerProps) { Map props = new HashMap<>(clientConfigs); props.putAll(producerProps); - putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); - putIfAbsent(props, KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - putIfAbsent(props, VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()); + props.putIfAbsent(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.putIfAbsent(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); if (sslEnabled()) { - putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); - putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); - putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); } KafkaProducer producer; try { @@ -748,9 +619,9 @@ public class EmbeddedKafkaCluster { return producer; } - private static void putIfAbsent(final Map props, final String propertyKey, final Object propertyValue) { - if (!props.containsKey(propertyKey)) { - props.put(propertyKey, propertyValue); - } + private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) { + brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0"); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, String.valueOf(numBrokers)); + brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false"); } } diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java index 9ac3498d196..e44f5ad5219 100644 --- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java +++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java @@ -149,14 +149,14 @@ public class KafkaClusterTestKit implements AutoCloseable { public static class Builder { private TestKitNodes nodes; - private Map configProps = new HashMap<>(); - private SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory(); + private final Map configProps = new HashMap<>(); + private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory(); public Builder(TestKitNodes nodes) { this.nodes = nodes; } - public Builder setConfigProp(String key, String value) { + public Builder setConfigProp(String key, Object value) { this.configProps.put(key, value); return this; } @@ -165,7 +165,7 @@ public class KafkaClusterTestKit implements AutoCloseable { BrokerNode brokerNode = nodes.brokerNodes().get(node.id()); ControllerNode controllerNode = nodes.controllerNodes().get(node.id()); - Map props = new HashMap<>(configProps); + Map props = new HashMap<>(configProps); props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG, Long.toString(TimeUnit.MINUTES.toMillis(10))); props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, roles(node.id())); @@ -188,13 +188,16 @@ public class KafkaClusterTestKit implements AutoCloseable { props.put(LOG_DIRS_CONFIG, controllerNode.metadataDirectory()); } - props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, - "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); - props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id())); - props.put(INTER_BROKER_LISTENER_NAME_CONFIG, - nodes.interBrokerListenerName().value()); - props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, - "CONTROLLER"); + + // We allow configuring the listeners and related properties via Builder::setConfigProp, + // and they shouldn't be overridden here + props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, + "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"); + props.putIfAbsent(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id())); + props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, + nodes.interBrokerListenerName().value()); + props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"); + // Note: we can't accurately set controller.quorum.voters yet, since we don't // yet know what ports each controller will pick. Set it to a dummy string // for now as a placeholder. @@ -257,7 +260,7 @@ public class KafkaClusterTestKit implements AutoCloseable { nodes.bootstrapMetadata()); } catch (Throwable e) { log.error("Error creating controller {}", node.id(), e); - Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", () -> sharedServer.stopForController()); + Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController); throw e; } controllers.put(node.id(), controller); @@ -288,7 +291,7 @@ public class KafkaClusterTestKit implements AutoCloseable { broker = new BrokerServer(sharedServer); } catch (Throwable e) { log.error("Error creating broker {}", node.id(), e); - Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", () -> sharedServer.stopForBroker()); + Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker); throw e; } brokers.put(node.id(), broker);