mirror of https://github.com/apache/kafka.git
KAFKA-14569: Migrate Connect's integration test EmbeddedKafkaCluster from ZK to KRaft mode (#16599)
Reviewers: Omnia Ibrahim <o.g.h.ibrahim@gmail.com>, Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
d260b06180
commit
61f61d6240
|
@ -3193,13 +3193,14 @@ project(':connect:runtime') {
|
||||||
testImplementation project(':core')
|
testImplementation project(':core')
|
||||||
testImplementation project(':server')
|
testImplementation project(':server')
|
||||||
testImplementation project(':metadata')
|
testImplementation project(':metadata')
|
||||||
|
testImplementation project(':server-common')
|
||||||
testImplementation project(':core').sourceSets.test.output
|
testImplementation project(':core').sourceSets.test.output
|
||||||
testImplementation project(':server-common')
|
testImplementation project(':server-common')
|
||||||
testImplementation project(':server')
|
testImplementation project(':server')
|
||||||
testImplementation project(':group-coordinator')
|
testImplementation project(':group-coordinator')
|
||||||
testImplementation project(':storage')
|
testImplementation project(':storage')
|
||||||
testImplementation project(':connect:test-plugins')
|
testImplementation project(':connect:test-plugins')
|
||||||
testImplementation project(':group-coordinator')
|
testImplementation project(':server-common').sourceSets.test.output
|
||||||
|
|
||||||
testImplementation libs.junitJupiter
|
testImplementation libs.junitJupiter
|
||||||
testImplementation libs.mockitoCore
|
testImplementation libs.mockitoCore
|
||||||
|
@ -3317,6 +3318,7 @@ project(':connect:file') {
|
||||||
testImplementation project(':connect:runtime').sourceSets.test.output
|
testImplementation project(':connect:runtime').sourceSets.test.output
|
||||||
testImplementation project(':core')
|
testImplementation project(':core')
|
||||||
testImplementation project(':core').sourceSets.test.output
|
testImplementation project(':core').sourceSets.test.output
|
||||||
|
testImplementation project(':server-common').sourceSets.test.output
|
||||||
}
|
}
|
||||||
|
|
||||||
javadoc {
|
javadoc {
|
||||||
|
@ -3418,6 +3420,7 @@ project(':connect:mirror') {
|
||||||
testImplementation project(':core')
|
testImplementation project(':core')
|
||||||
testImplementation project(':core').sourceSets.test.output
|
testImplementation project(':core').sourceSets.test.output
|
||||||
testImplementation project(':server')
|
testImplementation project(':server')
|
||||||
|
testImplementation project(':server-common').sourceSets.test.output
|
||||||
|
|
||||||
testRuntimeOnly project(':connect:runtime')
|
testRuntimeOnly project(':connect:runtime')
|
||||||
testRuntimeOnly libs.slf4jReload4j
|
testRuntimeOnly libs.slf4jReload4j
|
||||||
|
|
|
@ -592,6 +592,7 @@
|
||||||
<allow pkg="org.apache.kafka.server.config" />
|
<allow pkg="org.apache.kafka.server.config" />
|
||||||
<allow pkg="kafka.cluster" />
|
<allow pkg="kafka.cluster" />
|
||||||
<allow pkg="kafka.server" />
|
<allow pkg="kafka.server" />
|
||||||
|
<allow pkg="kafka.testkit" />
|
||||||
<allow pkg="kafka.zk" />
|
<allow pkg="kafka.zk" />
|
||||||
<allow pkg="kafka.utils" />
|
<allow pkg="kafka.utils" />
|
||||||
<allow class="javax.servlet.http.HttpServletResponse" />
|
<allow class="javax.servlet.http.HttpServletResponse" />
|
||||||
|
@ -612,6 +613,8 @@
|
||||||
<allow pkg="org.eclipse.jetty.util"/>
|
<allow pkg="org.eclipse.jetty.util"/>
|
||||||
<!-- for tests -->
|
<!-- for tests -->
|
||||||
<allow pkg="org.apache.kafka.server.util" />
|
<allow pkg="org.apache.kafka.server.util" />
|
||||||
|
<allow pkg="org.apache.kafka.server.config" />
|
||||||
|
<allow pkg="kafka.server"/>
|
||||||
</subpackage>
|
</subpackage>
|
||||||
|
|
||||||
<subpackage name="json">
|
<subpackage name="json">
|
||||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||||
import org.apache.kafka.clients.producer.Producer;
|
import org.apache.kafka.clients.producer.Producer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
import org.apache.kafka.common.KafkaException;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.config.ConfigResource;
|
import org.apache.kafka.common.config.ConfigResource;
|
||||||
import org.apache.kafka.common.config.TopicConfig;
|
import org.apache.kafka.common.config.TopicConfig;
|
||||||
|
@ -56,6 +56,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
|
||||||
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
|
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
|
||||||
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
|
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
|
||||||
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
|
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
|
||||||
|
import org.apache.kafka.test.TestCondition;
|
||||||
|
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
@ -78,7 +79,6 @@ import java.util.Optional;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -86,6 +86,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.LongUnaryOperator;
|
import java.util.function.LongUnaryOperator;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
|
||||||
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX;
|
import static org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX;
|
||||||
|
@ -301,12 +302,30 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
|
MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
|
||||||
MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
|
MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
|
||||||
|
|
||||||
// make sure the topic is auto-created in the other cluster
|
|
||||||
waitForTopicCreated(primary, reverseTopic1);
|
|
||||||
waitForTopicCreated(backup, backupTopic1);
|
|
||||||
waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
|
waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
|
||||||
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG),
|
|
||||||
"topic config was not synced");
|
TestCondition assertBackupTopicConfig = () -> {
|
||||||
|
String compactPolicy = getTopicConfig(backup.kafka(), backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG);
|
||||||
|
assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, compactPolicy, "topic config was not synced");
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (replicateBackupToPrimary) {
|
||||||
|
// make sure the topics are auto-created in the other cluster
|
||||||
|
waitForTopicCreated(primary, reverseTopic1);
|
||||||
|
waitForTopicCreated(backup, backupTopic1);
|
||||||
|
assertBackupTopicConfig.conditionMet();
|
||||||
|
} else {
|
||||||
|
// The backup and reverse topics are identical to the topics we created while setting up the test;
|
||||||
|
// we don't have to wait for them to be created, but there might be a small delay between
|
||||||
|
// now and when MM2 is able to sync the config for the backup topic
|
||||||
|
waitForCondition(
|
||||||
|
assertBackupTopicConfig,
|
||||||
|
10_000, // Topic config sync interval is one second; this should be plenty of time
|
||||||
|
"topic config was not synced in time"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
createAndTestNewTopicWithConfigFilter();
|
createAndTestNewTopicWithConfigFilter();
|
||||||
|
|
||||||
assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
|
assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
|
||||||
|
@ -429,12 +448,12 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
|
try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
|
||||||
waitForConsumingAllRecords(primaryConsumer, expectedRecords);
|
waitForConsumingAllRecords(primaryConsumer, expectedRecords);
|
||||||
}
|
}
|
||||||
|
|
||||||
// one way replication from primary to backup
|
// one way replication from primary to backup
|
||||||
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
|
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
|
||||||
mm2Config = new MirrorMakerConfig(mm2Props);
|
mm2Config = new MirrorMakerConfig(mm2Props);
|
||||||
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
|
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
|
||||||
|
|
||||||
// sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
|
// sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
|
||||||
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
|
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
|
||||||
|
|
||||||
|
@ -445,7 +464,7 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
backupTopic)) {
|
backupTopic)) {
|
||||||
waitForConsumingAllRecords(backupConsumer, expectedRecords);
|
waitForConsumingAllRecords(backupConsumer, expectedRecords);
|
||||||
}
|
}
|
||||||
|
|
||||||
try (Admin backupClient = backup.kafka().createAdminClient()) {
|
try (Admin backupClient = backup.kafka().createAdminClient()) {
|
||||||
// retrieve the consumer group offset from backup cluster
|
// retrieve the consumer group offset from backup cluster
|
||||||
Map<TopicPartition, OffsetAndMetadata> remoteOffsets =
|
Map<TopicPartition, OffsetAndMetadata> remoteOffsets =
|
||||||
|
@ -1189,14 +1208,11 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
* @param records Records to send in one parallel batch
|
* @param records Records to send in one parallel batch
|
||||||
*/
|
*/
|
||||||
protected void produceMessages(Producer<byte[], byte[]> producer, List<ProducerRecord<byte[], byte[]>> records) {
|
protected void produceMessages(Producer<byte[], byte[]> producer, List<ProducerRecord<byte[], byte[]>> records) {
|
||||||
List<Future<RecordMetadata>> futures = new ArrayList<>();
|
|
||||||
for (ProducerRecord<byte[], byte[]> record : records) {
|
|
||||||
futures.add(producer.send(record));
|
|
||||||
}
|
|
||||||
Timer timer = Time.SYSTEM.timer(RECORD_PRODUCE_DURATION_MS);
|
Timer timer = Time.SYSTEM.timer(RECORD_PRODUCE_DURATION_MS);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (Future<RecordMetadata> future : futures) {
|
for (ProducerRecord<byte[], byte[]> record : records) {
|
||||||
future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
|
producer.send(record).get(timer.remainingMs(), TimeUnit.MILLISECONDS);
|
||||||
timer.update();
|
timer.update();
|
||||||
}
|
}
|
||||||
} catch (ExecutionException | InterruptedException | TimeoutException e) {
|
} catch (ExecutionException | InterruptedException | TimeoutException e) {
|
||||||
|
@ -1397,13 +1413,46 @@ public class MirrorConnectorsIntegrationBaseTest {
|
||||||
/*
|
/*
|
||||||
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
|
* Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
|
||||||
*/
|
*/
|
||||||
protected void warmUpConsumer(Map<String, Object> consumerProps) {
|
protected final void warmUpConsumer(Map<String, Object> consumerProps) {
|
||||||
try (Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
|
final String topic = "test-topic-1";
|
||||||
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
|
warmUpConsumer("primary", primary.kafka(), consumerProps, topic);
|
||||||
dummyConsumer.commitSync();
|
warmUpConsumer("backup", backup.kafka(), consumerProps, topic);
|
||||||
}
|
}
|
||||||
try (Consumer<byte[], byte[]> dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
|
|
||||||
|
private void warmUpConsumer(String clusterName, EmbeddedKafkaCluster kafkaCluster, Map<String, Object> consumerProps, String topic) {
|
||||||
|
try (Consumer<?, ?> dummyConsumer = kafkaCluster.createConsumerAndSubscribeTo(consumerProps, topic)) {
|
||||||
|
// poll to ensure we've joined the group
|
||||||
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
|
dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
|
||||||
|
|
||||||
|
// force the consumer to have a known position on every topic partition
|
||||||
|
// so that it will be able to commit offsets for that position
|
||||||
|
// (it's possible that poll returns before that has happened)
|
||||||
|
Set<TopicPartition> topicPartitionsPendingPosition = IntStream.range(0, NUM_PARTITIONS)
|
||||||
|
.mapToObj(partition -> new TopicPartition(topic, partition))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
Timer positionTimer = Time.SYSTEM.timer(60_000);
|
||||||
|
while (!positionTimer.isExpired() && !topicPartitionsPendingPosition.isEmpty()) {
|
||||||
|
Set<TopicPartition> topicPartitionsWithPosition = new HashSet<>();
|
||||||
|
|
||||||
|
topicPartitionsPendingPosition.forEach(topicPartition -> {
|
||||||
|
try {
|
||||||
|
positionTimer.update();
|
||||||
|
dummyConsumer.position(topicPartition, Duration.ofMillis(positionTimer.remainingMs()));
|
||||||
|
topicPartitionsWithPosition.add(topicPartition);
|
||||||
|
} catch (KafkaException e) {
|
||||||
|
log.warn("Failed to calculate consumer position for {} on cluster {}", topicPartition, clusterName);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
topicPartitionsPendingPosition.removeAll(topicPartitionsWithPosition);
|
||||||
|
}
|
||||||
|
assertEquals(
|
||||||
|
Collections.emptySet(),
|
||||||
|
topicPartitionsPendingPosition,
|
||||||
|
"Failed to calculate consumer position for one or more partitions on cluster " + clusterName + " in time"
|
||||||
|
);
|
||||||
|
|
||||||
|
// And finally, commit offsets
|
||||||
dummyConsumer.commitSync();
|
dummyConsumer.commitSync();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.config.SslConfigs;
|
||||||
import org.apache.kafka.common.config.types.Password;
|
import org.apache.kafka.common.config.types.Password;
|
||||||
import org.apache.kafka.common.network.ConnectionMode;
|
import org.apache.kafka.common.network.ConnectionMode;
|
||||||
import org.apache.kafka.network.SocketServerConfigs;
|
import org.apache.kafka.network.SocketServerConfigs;
|
||||||
import org.apache.kafka.server.config.ReplicationConfigs;
|
|
||||||
import org.apache.kafka.test.TestSslUtils;
|
import org.apache.kafka.test.TestSslUtils;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
|
@ -42,25 +41,24 @@ public class MirrorConnectorsIntegrationSSLTest extends MirrorConnectorsIntegrat
|
||||||
public void startClusters() throws Exception {
|
public void startClusters() throws Exception {
|
||||||
Map<String, Object> sslConfig = TestSslUtils.createSslConfig(false, true, ConnectionMode.SERVER, TestUtils.tempFile(), "testCert");
|
Map<String, Object> sslConfig = TestSslUtils.createSslConfig(false, true, ConnectionMode.SERVER, TestUtils.tempFile(), "testCert");
|
||||||
// enable SSL on backup kafka broker
|
// enable SSL on backup kafka broker
|
||||||
backupBrokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:0");
|
backupBrokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "EXTERNAL:SSL,CONTROLLER:SSL");
|
||||||
backupBrokerProps.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL");
|
|
||||||
backupBrokerProps.putAll(sslConfig);
|
backupBrokerProps.putAll(sslConfig);
|
||||||
|
|
||||||
Properties sslProps = new Properties();
|
Properties sslProps = new Properties();
|
||||||
sslProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
sslProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
||||||
sslProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
|
sslProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
|
||||||
sslProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
sslProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
||||||
|
|
||||||
// set SSL config for kafka connect worker
|
// set SSL config for kafka connect worker
|
||||||
backupWorkerProps.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
|
backupWorkerProps.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
|
||||||
e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))));
|
e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))));
|
||||||
|
|
||||||
mm2Props.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
|
mm2Props.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
|
||||||
e -> BACKUP_CLUSTER_ALIAS + "." + e.getKey(), e -> String.valueOf(e.getValue()))));
|
e -> BACKUP_CLUSTER_ALIAS + "." + e.getKey(), e -> String.valueOf(e.getValue()))));
|
||||||
// set SSL config for producer used by source task in MM2
|
// set SSL config for producer used by source task in MM2
|
||||||
mm2Props.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
|
mm2Props.putAll(sslProps.entrySet().stream().collect(Collectors.toMap(
|
||||||
e -> BACKUP_CLUSTER_ALIAS + ".producer." + e.getKey(), e -> String.valueOf(e.getValue()))));
|
e -> BACKUP_CLUSTER_ALIAS + ".producer." + e.getKey(), e -> String.valueOf(e.getValue()))));
|
||||||
|
|
||||||
super.startClusters();
|
super.startClusters();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
|
||||||
import org.apache.kafka.common.acl.AclOperation;
|
import org.apache.kafka.common.acl.AclOperation;
|
||||||
import org.apache.kafka.common.acl.AclPermissionType;
|
import org.apache.kafka.common.acl.AclPermissionType;
|
||||||
import org.apache.kafka.common.config.TopicConfig;
|
import org.apache.kafka.common.config.TopicConfig;
|
||||||
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
||||||
import org.apache.kafka.common.resource.PatternType;
|
import org.apache.kafka.common.resource.PatternType;
|
||||||
import org.apache.kafka.common.resource.ResourcePattern;
|
import org.apache.kafka.common.resource.ResourcePattern;
|
||||||
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
import org.apache.kafka.common.resource.ResourcePatternFilter;
|
||||||
|
@ -33,6 +34,9 @@ import org.apache.kafka.connect.mirror.MirrorMakerConfig;
|
||||||
import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
|
import org.apache.kafka.connect.mirror.clients.admin.FakeForwardingAdminWithLocalMetadata;
|
||||||
import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
|
import org.apache.kafka.connect.mirror.clients.admin.FakeLocalMetadataStore;
|
||||||
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
|
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
|
||||||
|
import org.apache.kafka.network.SocketServerConfigs;
|
||||||
|
import org.apache.kafka.server.config.KRaftConfigs;
|
||||||
|
import org.apache.kafka.server.config.ServerConfigs;
|
||||||
|
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
@ -62,23 +66,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
*/
|
*/
|
||||||
@Tag("integration")
|
@Tag("integration")
|
||||||
public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends MirrorConnectorsIntegrationBaseTest {
|
public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends MirrorConnectorsIntegrationBaseTest {
|
||||||
|
|
||||||
|
private static final int TOPIC_ACL_SYNC_DURATION_MS = 30_000;
|
||||||
private static final int FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS = 60_000;
|
private static final int FAKE_LOCAL_METADATA_STORE_SYNC_DURATION_MS = 60_000;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* enable ACL on brokers.
|
* enable ACL on brokers.
|
||||||
*/
|
*/
|
||||||
protected static void enableAclAuthorizer(Properties brokerProps) {
|
protected static void enableAclAuthorizer(Properties brokerProps) {
|
||||||
brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer");
|
brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
|
||||||
brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
|
brokerProps.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, "org.apache.kafka.metadata.authorizer.StandardAuthorizer");
|
||||||
brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
|
brokerProps.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN");
|
||||||
brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
|
brokerProps.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN");
|
||||||
brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0");
|
brokerProps.put(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN");
|
||||||
brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config",
|
String listenerSaslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required "
|
||||||
"org.apache.kafka.common.security.plain.PlainLoginModule required "
|
+ "username=\"super\" "
|
||||||
+ "username=\"super\" "
|
+ "password=\"super_pwd\" "
|
||||||
+ "password=\"super_pwd\" "
|
+ "user_connector=\"connector_pwd\" "
|
||||||
+ "user_connector=\"connector_pwd\" "
|
+ "user_super=\"super_pwd\";";
|
||||||
+ "user_super=\"super_pwd\";");
|
brokerProps.put("listener.name.external.plain.sasl.jaas.config", listenerSaslJaasConfig);
|
||||||
|
brokerProps.put("listener.name.controller.plain.sasl.jaas.config", listenerSaslJaasConfig);
|
||||||
brokerProps.put("super.users", "User:super");
|
brokerProps.put("super.users", "User:super");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,6 +300,7 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
|
||||||
@Test
|
@Test
|
||||||
public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception {
|
public void testSyncTopicACLsUseProvidedForwardingAdmin() throws Exception {
|
||||||
mm2Props.put("sync.topic.acls.enabled", "true");
|
mm2Props.put("sync.topic.acls.enabled", "true");
|
||||||
|
mm2Props.put("sync.topic.acls.interval.seconds", "1");
|
||||||
mm2Config = new MirrorMakerConfig(mm2Props);
|
mm2Config = new MirrorMakerConfig(mm2Props);
|
||||||
List<AclBinding> aclBindings = Collections.singletonList(
|
List<AclBinding> aclBindings = Collections.singletonList(
|
||||||
new AclBinding(
|
new AclBinding(
|
||||||
|
@ -324,8 +332,16 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
|
||||||
new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)
|
new AccessControlEntry("User:dummy", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)
|
||||||
);
|
);
|
||||||
|
|
||||||
assertTrue(getAclBindings(backup.kafka(), "primary.test-topic-1").contains(expectedACLOnBackupCluster), "topic ACLs was synced");
|
// In some rare cases replica topics are created before ACLs are synced, so retry logic is necessary
|
||||||
assertTrue(getAclBindings(primary.kafka(), "backup.test-topic-1").contains(expectedACLOnPrimaryCluster), "topic ACLs was synced");
|
waitForCondition(
|
||||||
|
() -> {
|
||||||
|
assertTrue(getAclBindings(backup.kafka(), "primary.test-topic-1").contains(expectedACLOnBackupCluster), "topic ACLs are not synced on backup cluster");
|
||||||
|
assertTrue(getAclBindings(primary.kafka(), "backup.test-topic-1").contains(expectedACLOnPrimaryCluster), "topic ACLs are not synced on primary cluster");
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
TOPIC_ACL_SYNC_DURATION_MS,
|
||||||
|
"Topic ACLs were not synced in time"
|
||||||
|
);
|
||||||
|
|
||||||
// expect to use FakeForwardingAdminWithLocalMetadata to update topic ACLs in FakeLocalMetadataStore.allAcls
|
// expect to use FakeForwardingAdminWithLocalMetadata to update topic ACLs in FakeLocalMetadataStore.allAcls
|
||||||
assertTrue(FakeLocalMetadataStore.aclBindings("dummy").containsAll(Arrays.asList(expectedACLOnBackupCluster, expectedACLOnPrimaryCluster)));
|
assertTrue(FakeLocalMetadataStore.aclBindings("dummy").containsAll(Arrays.asList(expectedACLOnBackupCluster, expectedACLOnPrimaryCluster)));
|
||||||
|
|
|
@ -123,7 +123,7 @@ public class BlockingConnectorTest {
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connect = new EmbeddedConnectCluster.Builder()
|
connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -138,7 +138,7 @@ public class BlockingConnectorTest {
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close() {
|
public void close() {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
// unblock everything so that we don't leak threads after each test run
|
// unblock everything so that we don't leak threads after each test run
|
||||||
Block.reset();
|
Block.reset();
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||||
import org.apache.kafka.connect.util.SinkUtils;
|
import org.apache.kafka.connect.util.SinkUtils;
|
||||||
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
|
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
|
||||||
import org.apache.kafka.connect.util.clusters.WorkerHandle;
|
import org.apache.kafka.connect.util.clusters.WorkerHandle;
|
||||||
|
import org.apache.kafka.network.SocketServerConfigs;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
|
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
@ -57,6 +58,8 @@ import org.slf4j.event.Level;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.ServerSocket;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -118,6 +121,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
|
private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
|
||||||
|
|
||||||
private static final int NUM_TOPIC_PARTITIONS = 3;
|
private static final int NUM_TOPIC_PARTITIONS = 3;
|
||||||
|
private static final long RECORD_TRANSFER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(60);
|
||||||
private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
|
private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
|
||||||
private static final int NUM_WORKERS = 3;
|
private static final int NUM_WORKERS = 3;
|
||||||
private static final int NUM_TASKS = 4;
|
private static final int NUM_TASKS = 4;
|
||||||
|
@ -142,7 +146,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
brokerProps = new Properties();
|
brokerProps = new Properties();
|
||||||
brokerProps.put("auto.create.topics.enable", String.valueOf(false));
|
brokerProps.put("auto.create.topics.enable", String.valueOf(false));
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connectBuilder = new EmbeddedConnectCluster.Builder()
|
connectBuilder = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -154,7 +158,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close(TestInfo testInfo) {
|
public void close(TestInfo testInfo) {
|
||||||
log.info("Finished test {}", testInfo.getDisplayName());
|
log.info("Finished test {}", testInfo.getDisplayName());
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,8 +248,11 @@ public class ConnectWorkerIntegrationTest {
|
||||||
public void testBrokerCoordinator() throws Exception {
|
public void testBrokerCoordinator() throws Exception {
|
||||||
ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
|
ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
|
||||||
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000));
|
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000));
|
||||||
connect = connectBuilder.workerProps(workerProps).build();
|
|
||||||
|
useFixedBrokerPort();
|
||||||
|
|
||||||
// start the clusters
|
// start the clusters
|
||||||
|
connect = connectBuilder.build();
|
||||||
connect.start();
|
connect.start();
|
||||||
int numTasks = 4;
|
int numTasks = 4;
|
||||||
// create test topic
|
// create test topic
|
||||||
|
@ -263,7 +270,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
// expect that the connector will be stopped once the coordinator is detected to be down
|
// expect that the connector will be stopped once the coordinator is detected to be down
|
||||||
StartAndStopLatch stopLatch = connectorHandle.expectedStops(1, false);
|
StartAndStopLatch stopLatch = connectorHandle.expectedStops(1, false);
|
||||||
|
|
||||||
connect.kafka().stopOnlyKafka();
|
connect.kafka().stopOnlyBrokers();
|
||||||
|
|
||||||
// Allow for the workers to discover that the coordinator is unavailable, wait is
|
// Allow for the workers to discover that the coordinator is unavailable, wait is
|
||||||
// heartbeat timeout * 2 + 4sec
|
// heartbeat timeout * 2 + 4sec
|
||||||
|
@ -294,7 +301,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
+ CONNECTOR_SETUP_DURATION_MS + "ms");
|
+ CONNECTOR_SETUP_DURATION_MS + "ms");
|
||||||
|
|
||||||
StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, false);
|
StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, false);
|
||||||
connect.kafka().startOnlyKafkaOnSamePorts();
|
connect.kafka().restartOnlyBrokers();
|
||||||
|
|
||||||
// Allow for the kafka brokers to come back online
|
// Allow for the kafka brokers to come back online
|
||||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||||
|
@ -835,8 +842,10 @@ public class ConnectWorkerIntegrationTest {
|
||||||
// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to
|
// Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to
|
||||||
// be spuriously triggered after the group coordinator for a Connect cluster is bounced
|
// be spuriously triggered after the group coordinator for a Connect cluster is bounced
|
||||||
workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
|
workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
|
||||||
|
|
||||||
|
useFixedBrokerPort();
|
||||||
|
|
||||||
connect = connectBuilder
|
connect = connectBuilder
|
||||||
.numBrokers(1)
|
|
||||||
.numWorkers(1)
|
.numWorkers(1)
|
||||||
.build();
|
.build();
|
||||||
connect.start();
|
connect.start();
|
||||||
|
@ -854,7 +863,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
|
|
||||||
// Bring down Kafka, which should cause some REST requests to fail
|
// Bring down Kafka, which should cause some REST requests to fail
|
||||||
log.info("Stopping Kafka cluster");
|
log.info("Stopping Kafka cluster");
|
||||||
connect.kafka().stopOnlyKafka();
|
connect.kafka().stopOnlyBrokers();
|
||||||
|
|
||||||
// Try to reconfigure the connector, which should fail with a timeout error
|
// Try to reconfigure the connector, which should fail with a timeout error
|
||||||
log.info("Trying to reconfigure connector while Kafka cluster is down");
|
log.info("Trying to reconfigure connector while Kafka cluster is down");
|
||||||
|
@ -863,7 +872,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
"flushing updates to the status topic"
|
"flushing updates to the status topic"
|
||||||
);
|
);
|
||||||
log.info("Restarting Kafka cluster");
|
log.info("Restarting Kafka cluster");
|
||||||
connect.kafka().startOnlyKafkaOnSamePorts();
|
connect.kafka().restartOnlyBrokers();
|
||||||
connect.assertions().assertExactlyNumBrokersAreUp(1, "Broker did not complete startup in time");
|
connect.assertions().assertExactlyNumBrokersAreUp(1, "Broker did not complete startup in time");
|
||||||
log.info("Kafka cluster is restarted");
|
log.info("Kafka cluster is restarted");
|
||||||
|
|
||||||
|
@ -1182,7 +1191,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
NUM_TASKS,
|
NUM_TASKS,
|
||||||
"Connector or its tasks did not start in time"
|
"Connector or its tasks did not start in time"
|
||||||
);
|
);
|
||||||
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
|
connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
|
||||||
|
|
||||||
connect.deleteConnector(CONNECTOR_NAME);
|
connect.deleteConnector(CONNECTOR_NAME);
|
||||||
|
|
||||||
|
@ -1223,7 +1232,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
NUM_TASKS,
|
NUM_TASKS,
|
||||||
"Connector or its tasks did not start in time"
|
"Connector or its tasks did not start in time"
|
||||||
);
|
);
|
||||||
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
|
connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
|
||||||
|
|
||||||
// See if any new records got written to the old topic
|
// See if any new records got written to the old topic
|
||||||
final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition);
|
final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition);
|
||||||
|
@ -1282,7 +1291,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
NUM_TASKS,
|
NUM_TASKS,
|
||||||
"Connector or its tasks did not start in time"
|
"Connector or its tasks did not start in time"
|
||||||
);
|
);
|
||||||
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
|
connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
|
||||||
|
|
||||||
// Delete the secrets file, which should render the old task configs invalid
|
// Delete the secrets file, which should render the old task configs invalid
|
||||||
assertTrue(secretsFile.delete(), "Failed to delete secrets file");
|
assertTrue(secretsFile.delete(), "Failed to delete secrets file");
|
||||||
|
@ -1307,7 +1316,7 @@ public class ConnectWorkerIntegrationTest {
|
||||||
|
|
||||||
// Wait for at least one task to commit offsets after being restarted
|
// Wait for at least one task to commit offsets after being restarted
|
||||||
connectorHandle.expectedCommits(1);
|
connectorHandle.expectedCommits(1);
|
||||||
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
|
connectorHandle.awaitCommits(RECORD_TRANSFER_TIMEOUT_MS);
|
||||||
|
|
||||||
final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
|
final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
|
||||||
assertTrue(
|
assertTrue(
|
||||||
|
@ -1446,6 +1455,23 @@ public class ConnectWorkerIntegrationTest {
|
||||||
return props;
|
return props;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void useFixedBrokerPort() throws IOException {
|
||||||
|
// Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners
|
||||||
|
// config to get a random free port because in this test we want to stop the Kafka broker and then bring it
|
||||||
|
// back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka
|
||||||
|
// and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen
|
||||||
|
// on a different random free port the second time it is started. Note that we can only use the static port
|
||||||
|
// because we have a single broker setup in this test.
|
||||||
|
int listenerPort;
|
||||||
|
try (ServerSocket s = new ServerSocket(0)) {
|
||||||
|
listenerPort = s.getLocalPort();
|
||||||
|
}
|
||||||
|
brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", listenerPort));
|
||||||
|
connectBuilder
|
||||||
|
.numBrokers(1)
|
||||||
|
.brokerProps(brokerProps);
|
||||||
|
}
|
||||||
|
|
||||||
public static class EmptyTaskConfigsConnector extends SinkConnector {
|
public static class EmptyTaskConfigsConnector extends SinkConnector {
|
||||||
@Override
|
@Override
|
||||||
public String version() {
|
public String version() {
|
||||||
|
|
|
@ -110,7 +110,7 @@ public class ConnectorClientPolicyIntegrationTest {
|
||||||
Properties exampleBrokerProps = new Properties();
|
Properties exampleBrokerProps = new Properties();
|
||||||
exampleBrokerProps.put("auto.create.topics.enable", "false");
|
exampleBrokerProps.put("auto.create.topics.enable", "false");
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder()
|
EmbeddedConnectCluster connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
|
|
@ -119,7 +119,6 @@ public class ConnectorRestartApiIntegrationTest {
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void close() {
|
public static void close() {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
|
||||||
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
|
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,7 +126,7 @@ public class ConnectorRestartApiIntegrationTest {
|
||||||
public void testRestartUnknownConnectorNoParams() throws Exception {
|
public void testRestartUnknownConnectorNoParams() throws Exception {
|
||||||
String connectorName = "Unknown";
|
String connectorName = "Unknown";
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
startOrReuseConnectWithNumWorkers(ONE_WORKER);
|
startOrReuseConnectWithNumWorkers(ONE_WORKER);
|
||||||
// Call the Restart API
|
// Call the Restart API
|
||||||
String restartEndpoint = connect.endpointForResource(
|
String restartEndpoint = connect.endpointForResource(
|
||||||
|
@ -148,7 +147,7 @@ public class ConnectorRestartApiIntegrationTest {
|
||||||
private void restartUnknownConnector(boolean onlyFailed, boolean includeTasks) throws Exception {
|
private void restartUnknownConnector(boolean onlyFailed, boolean includeTasks) throws Exception {
|
||||||
String connectorName = "Unknown";
|
String connectorName = "Unknown";
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
startOrReuseConnectWithNumWorkers(ONE_WORKER);
|
startOrReuseConnectWithNumWorkers(ONE_WORKER);
|
||||||
// Call the Restart API
|
// Call the Restart API
|
||||||
String restartEndpoint = connect.endpointForResource(
|
String restartEndpoint = connect.endpointForResource(
|
||||||
|
@ -299,7 +298,7 @@ public class ConnectorRestartApiIntegrationTest {
|
||||||
// setup up props for the source connector
|
// setup up props for the source connector
|
||||||
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
|
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
|
||||||
props.put("connector.start.inject.error", "true");
|
props.put("connector.start.inject.error", "true");
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
startOrReuseConnectWithNumWorkers(ONE_WORKER);
|
startOrReuseConnectWithNumWorkers(ONE_WORKER);
|
||||||
|
|
||||||
// Try to start the connector and its single task.
|
// Try to start the connector and its single task.
|
||||||
|
@ -330,7 +329,7 @@ public class ConnectorRestartApiIntegrationTest {
|
||||||
// setup up props for the source connector
|
// setup up props for the source connector
|
||||||
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
|
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
|
||||||
tasksToFail.forEach(taskId -> props.put("task-" + taskId + ".start.inject.error", "true"));
|
tasksToFail.forEach(taskId -> props.put("task-" + taskId + ".start.inject.error", "true"));
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
startOrReuseConnectWithNumWorkers(ONE_WORKER);
|
startOrReuseConnectWithNumWorkers(ONE_WORKER);
|
||||||
|
|
||||||
// Try to start the connector and its single task.
|
// Try to start the connector and its single task.
|
||||||
|
|
|
@ -90,7 +90,7 @@ public class ConnectorTopicsIntegrationTest {
|
||||||
// setup Kafka broker properties
|
// setup Kafka broker properties
|
||||||
brokerProps.put("auto.create.topics.enable", String.valueOf(false));
|
brokerProps.put("auto.create.topics.enable", String.valueOf(false));
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connectBuilder = new EmbeddedConnectCluster.Builder()
|
connectBuilder = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -101,7 +101,7 @@ public class ConnectorTopicsIntegrationTest {
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close() {
|
public void close() {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,6 @@ public class ConnectorValidationIntegrationTest {
|
||||||
TestPlugins.pluginPathJoined(testPlugins)
|
TestPlugins.pluginPathJoined(testPlugins)
|
||||||
);
|
);
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
|
||||||
connect = new EmbeddedConnectCluster.Builder()
|
connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connector-validation-connect-cluster")
|
.name("connector-validation-connect-cluster")
|
||||||
.workerProps(workerProps)
|
.workerProps(workerProps)
|
||||||
|
@ -93,7 +92,6 @@ public class ConnectorValidationIntegrationTest {
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void close() {
|
public static void close() {
|
||||||
if (connect != null) {
|
if (connect != null) {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
|
||||||
Utils.closeQuietly(connect::stop, "Embedded Connect cluster");
|
Utils.closeQuietly(connect::stop, "Embedded Connect cluster");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.common.acl.AclBinding;
|
||||||
import org.apache.kafka.common.acl.AclOperation;
|
import org.apache.kafka.common.acl.AclOperation;
|
||||||
import org.apache.kafka.common.acl.AclPermissionType;
|
import org.apache.kafka.common.acl.AclPermissionType;
|
||||||
import org.apache.kafka.common.config.ConfigDef;
|
import org.apache.kafka.common.config.ConfigDef;
|
||||||
|
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
|
||||||
import org.apache.kafka.common.errors.ProducerFencedException;
|
import org.apache.kafka.common.errors.ProducerFencedException;
|
||||||
import org.apache.kafka.common.resource.PatternType;
|
import org.apache.kafka.common.resource.PatternType;
|
||||||
import org.apache.kafka.common.resource.ResourcePattern;
|
import org.apache.kafka.common.resource.ResourcePattern;
|
||||||
|
@ -42,6 +43,7 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
|
||||||
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
|
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
|
||||||
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
|
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
|
||||||
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
|
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
|
||||||
|
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
|
||||||
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
|
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
|
||||||
import org.apache.kafka.connect.source.SourceConnector;
|
import org.apache.kafka.connect.source.SourceConnector;
|
||||||
import org.apache.kafka.connect.source.SourceRecord;
|
import org.apache.kafka.connect.source.SourceRecord;
|
||||||
|
@ -50,6 +52,10 @@ import org.apache.kafka.connect.storage.StringConverter;
|
||||||
import org.apache.kafka.connect.util.clusters.ConnectAssertions;
|
import org.apache.kafka.connect.util.clusters.ConnectAssertions;
|
||||||
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
|
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
|
||||||
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
|
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
|
||||||
|
import org.apache.kafka.network.SocketServerConfigs;
|
||||||
|
import org.apache.kafka.server.config.KRaftConfigs;
|
||||||
|
import org.apache.kafka.server.config.ServerConfigs;
|
||||||
|
import org.apache.kafka.test.NoRetryException;
|
||||||
|
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
@ -116,6 +122,7 @@ public class ExactlyOnceSourceIntegrationTest {
|
||||||
|
|
||||||
private static final int CONSUME_RECORDS_TIMEOUT_MS = 60_000;
|
private static final int CONSUME_RECORDS_TIMEOUT_MS = 60_000;
|
||||||
private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30_000;
|
private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30_000;
|
||||||
|
private static final int ACL_PROPAGATION_TIMEOUT_MS = 30_000;
|
||||||
private static final int DEFAULT_NUM_WORKERS = 3;
|
private static final int DEFAULT_NUM_WORKERS = 3;
|
||||||
|
|
||||||
// Tests require that a minimum but not unreasonably large number of records are sourced.
|
// Tests require that a minimum but not unreasonably large number of records are sourced.
|
||||||
|
@ -140,7 +147,7 @@ public class ExactlyOnceSourceIntegrationTest {
|
||||||
brokerProps.put("transaction.state.log.replication.factor", "1");
|
brokerProps.put("transaction.state.log.replication.factor", "1");
|
||||||
brokerProps.put("transaction.state.log.min.isr", "1");
|
brokerProps.put("transaction.state.log.min.isr", "1");
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by Kafka
|
||||||
connectBuilder = new EmbeddedConnectCluster.Builder()
|
connectBuilder = new EmbeddedConnectCluster.Builder()
|
||||||
.numWorkers(DEFAULT_NUM_WORKERS)
|
.numWorkers(DEFAULT_NUM_WORKERS)
|
||||||
.numBrokers(1)
|
.numBrokers(1)
|
||||||
|
@ -159,7 +166,7 @@ public class ExactlyOnceSourceIntegrationTest {
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close() {
|
public void close() {
|
||||||
try {
|
try {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
} finally {
|
} finally {
|
||||||
// Clear the handle for the connector. Fun fact: if you don't do this, your tests become quite flaky.
|
// Clear the handle for the connector. Fun fact: if you don't do this, your tests become quite flaky.
|
||||||
|
@ -624,17 +631,18 @@ public class ExactlyOnceSourceIntegrationTest {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testTasksFailOnInabilityToFence() throws Exception {
|
public void testTasksFailOnInabilityToFence() throws Exception {
|
||||||
brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer");
|
brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
|
||||||
brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
|
brokerProps.put(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, "org.apache.kafka.metadata.authorizer.StandardAuthorizer");
|
||||||
brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
|
brokerProps.put(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, "PLAIN");
|
||||||
brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
|
brokerProps.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, "PLAIN");
|
||||||
brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0");
|
brokerProps.put(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, "PLAIN");
|
||||||
brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config",
|
String listenerSaslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required "
|
||||||
"org.apache.kafka.common.security.plain.PlainLoginModule required "
|
+ "username=\"super\" "
|
||||||
+ "username=\"super\" "
|
+ "password=\"super_pwd\" "
|
||||||
+ "password=\"super_pwd\" "
|
+ "user_connector=\"connector_pwd\" "
|
||||||
+ "user_connector=\"connector_pwd\" "
|
+ "user_super=\"super_pwd\";";
|
||||||
+ "user_super=\"super_pwd\";");
|
brokerProps.put("listener.name.external.plain.sasl.jaas.config", listenerSaslJaasConfig);
|
||||||
|
brokerProps.put("listener.name.controller.plain.sasl.jaas.config", listenerSaslJaasConfig);
|
||||||
brokerProps.put("super.users", "User:super");
|
brokerProps.put("super.users", "User:super");
|
||||||
|
|
||||||
Map<String, String> superUserClientConfig = new HashMap<>();
|
Map<String, String> superUserClientConfig = new HashMap<>();
|
||||||
|
@ -694,12 +702,30 @@ public class ExactlyOnceSourceIntegrationTest {
|
||||||
)).all().get();
|
)).all().get();
|
||||||
}
|
}
|
||||||
|
|
||||||
StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax);
|
|
||||||
|
|
||||||
log.info("Bringing up connector with fresh slate; fencing should not be necessary");
|
log.info("Bringing up connector with fresh slate; fencing should not be necessary");
|
||||||
connect.configureConnector(CONNECTOR_NAME, props);
|
connect.configureConnector(CONNECTOR_NAME, props);
|
||||||
assertConnectorStarted(connectorStart);
|
|
||||||
// Verify that the connector and its tasks have been able to start successfully
|
// Hack: There is a small chance that our recent ACL updates for the connector have
|
||||||
|
// not yet been propagated across the entire Kafka cluster, and that our connector
|
||||||
|
// will fail on startup when it tries to list the end offsets of the worker's offsets topic
|
||||||
|
// So, we implement some retry logic here to add a layer of resiliency in that case
|
||||||
|
waitForCondition(
|
||||||
|
() -> {
|
||||||
|
ConnectorStateInfo status = connect.connectorStatus(CONNECTOR_NAME);
|
||||||
|
if ("RUNNING".equals(status.connector().state())) {
|
||||||
|
return true;
|
||||||
|
} else if ("FAILED".equals(status.connector().state())) {
|
||||||
|
log.debug("Restarting failed connector {}", CONNECTOR_NAME);
|
||||||
|
connect.restartConnector(CONNECTOR_NAME);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
},
|
||||||
|
ACL_PROPAGATION_TIMEOUT_MS,
|
||||||
|
"Connector was not able to start in time, "
|
||||||
|
+ "or ACL updates were not propagated across the Kafka cluster soon enough"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Also verify that the connector's tasks have been able to start successfully
|
||||||
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have started successfully");
|
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have started successfully");
|
||||||
|
|
||||||
log.info("Reconfiguring connector; fencing should be necessary, and tasks should fail to start");
|
log.info("Reconfiguring connector; fencing should be necessary, and tasks should fail to start");
|
||||||
|
@ -725,8 +751,39 @@ public class ExactlyOnceSourceIntegrationTest {
|
||||||
|
|
||||||
log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time");
|
log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time");
|
||||||
connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false);
|
connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false);
|
||||||
|
|
||||||
// Verify that the connector and its tasks have been able to restart successfully
|
// Verify that the connector and its tasks have been able to restart successfully
|
||||||
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have restarted successfully");
|
// Use the same retry logic as above, in case there is a delay in the propagation of our ACL updates
|
||||||
|
waitForCondition(
|
||||||
|
() -> {
|
||||||
|
ConnectorStateInfo status = connect.connectorStatus(CONNECTOR_NAME);
|
||||||
|
boolean connectorRunning = "RUNNING".equals(status.connector().state());
|
||||||
|
boolean allTasksRunning = status.tasks().stream()
|
||||||
|
.allMatch(t -> "RUNNING".equals(t.state()));
|
||||||
|
boolean expectedNumTasks = status.tasks().size() == tasksMax;
|
||||||
|
if (connectorRunning && allTasksRunning && expectedNumTasks) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
if (!connectorRunning) {
|
||||||
|
if ("FAILED".equals(status.connector().state())) {
|
||||||
|
// Only task failures are expected ;if the connector has failed, something
|
||||||
|
// else is wrong and we should fail the test immediately
|
||||||
|
throw new NoRetryException(
|
||||||
|
new AssertionError("Connector " + CONNECTOR_NAME + " has failed unexpectedly")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Restart all failed tasks
|
||||||
|
status.tasks().stream()
|
||||||
|
.filter(t -> "FAILED".equals(t.state()))
|
||||||
|
.map(ConnectorStateInfo.TaskState::id)
|
||||||
|
.forEach(t -> connect.restartTask(CONNECTOR_NAME, t));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
ConnectAssertions.CONNECTOR_SETUP_DURATION_MS,
|
||||||
|
"Connector and task should have restarted successfully"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -79,7 +79,7 @@ public class ExampleConnectIntegrationTest {
|
||||||
Properties exampleBrokerProps = new Properties();
|
Properties exampleBrokerProps = new Properties();
|
||||||
exampleBrokerProps.put("auto.create.topics.enable", "false");
|
exampleBrokerProps.put("auto.create.topics.enable", "false");
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connect = new EmbeddedConnectCluster.Builder()
|
connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -100,7 +100,7 @@ public class ExampleConnectIntegrationTest {
|
||||||
// delete connector handle
|
// delete connector handle
|
||||||
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
|
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
|
||||||
|
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class InternalTopicsIntegrationTest {
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close() {
|
public void close() {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect workers and Kafka brokers.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -120,7 +120,6 @@ public class OffsetsApiIntegrationTest {
|
||||||
|
|
||||||
@AfterAll
|
@AfterAll
|
||||||
public static void close() {
|
public static void close() {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
|
||||||
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
|
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
|
||||||
// wait for all blocked threads created while testing zombie task scenarios to finish
|
// wait for all blocked threads created while testing zombie task scenarios to finish
|
||||||
BlockingConnectorTest.Block.join();
|
BlockingConnectorTest.Block.join();
|
||||||
|
|
|
@ -87,7 +87,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
Properties brokerProps = new Properties();
|
Properties brokerProps = new Properties();
|
||||||
brokerProps.put("auto.create.topics.enable", "false");
|
brokerProps.put("auto.create.topics.enable", "false");
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connect = new EmbeddedConnectCluster.Builder()
|
connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -103,7 +103,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close(TestInfo testInfo) {
|
public void close(TestInfo testInfo) {
|
||||||
log.info("Finished test {}", testInfo.getDisplayName());
|
log.info("Finished test {}", testInfo.getDisplayName());
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,7 @@ public class RestExtensionIntegrationTest {
|
||||||
Map<String, String> workerProps = new HashMap<>();
|
Map<String, String> workerProps = new HashMap<>();
|
||||||
workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
|
workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connect = new EmbeddedConnectCluster.Builder()
|
connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -135,7 +135,7 @@ public class RestExtensionIntegrationTest {
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close() {
|
public void close() {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
IntegrationTestRestExtension.instance = null;
|
IntegrationTestRestExtension.instance = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,7 +64,7 @@ public class SessionedProtocolIntegrationTest {
|
||||||
Map<String, String> workerProps = new HashMap<>();
|
Map<String, String> workerProps = new HashMap<>();
|
||||||
workerProps.put(CONNECT_PROTOCOL_CONFIG, ConnectProtocolCompatibility.SESSIONED.protocol());
|
workerProps.put(CONNECT_PROTOCOL_CONFIG, ConnectProtocolCompatibility.SESSIONED.protocol());
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connect = new EmbeddedConnectCluster.Builder()
|
connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(2)
|
.numWorkers(2)
|
||||||
|
@ -81,7 +81,7 @@ public class SessionedProtocolIntegrationTest {
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close() {
|
public void close() {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class SinkConnectorsIntegrationTest {
|
||||||
brokerProps.put("auto.create.topics.enable", "false");
|
brokerProps.put("auto.create.topics.enable", "false");
|
||||||
brokerProps.put("delete.topic.enable", "true");
|
brokerProps.put("delete.topic.enable", "true");
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connect = new EmbeddedConnectCluster.Builder()
|
connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -90,7 +90,7 @@ public class SinkConnectorsIntegrationTest {
|
||||||
// delete connector handle
|
// delete connector handle
|
||||||
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
|
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
|
||||||
|
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class SourceConnectorsIntegrationTest {
|
||||||
// setup Kafka broker properties
|
// setup Kafka broker properties
|
||||||
brokerProps.put("auto.create.topics.enable", String.valueOf(false));
|
brokerProps.put("auto.create.topics.enable", String.valueOf(false));
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connectBuilder = new EmbeddedConnectCluster.Builder()
|
connectBuilder = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -91,7 +91,7 @@ public class SourceConnectorsIntegrationTest {
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void close() {
|
public void close() {
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class TransformationIntegrationTest {
|
||||||
// This is required because tests in this class also test per-connector topic creation with transformations
|
// This is required because tests in this class also test per-connector topic creation with transformations
|
||||||
brokerProps.put("auto.create.topics.enable", "false");
|
brokerProps.put("auto.create.topics.enable", "false");
|
||||||
|
|
||||||
// build a Connect cluster backed by Kafka and Zk
|
// build a Connect cluster backed by a Kafka KRaft cluster
|
||||||
connect = new EmbeddedConnectCluster.Builder()
|
connect = new EmbeddedConnectCluster.Builder()
|
||||||
.name("connect-cluster")
|
.name("connect-cluster")
|
||||||
.numWorkers(NUM_WORKERS)
|
.numWorkers(NUM_WORKERS)
|
||||||
|
@ -105,7 +105,7 @@ public class TransformationIntegrationTest {
|
||||||
// delete connector handle
|
// delete connector handle
|
||||||
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
|
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
|
||||||
|
|
||||||
// stop all Connect, Kafka and Zk threads.
|
// stop the Connect cluster and its backing Kafka cluster.
|
||||||
connect.stop();
|
connect.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -121,7 +121,7 @@ abstract class EmbeddedConnect {
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the Connect cluster and the embedded Kafka and Zookeeper cluster,
|
* Start the Connect cluster and the embedded Kafka KRaft cluster,
|
||||||
* and wait for the Kafka and Connect clusters to become healthy.
|
* and wait for the Kafka and Connect clusters to become healthy.
|
||||||
*/
|
*/
|
||||||
public void start() {
|
public void start() {
|
||||||
|
@ -163,7 +163,7 @@ abstract class EmbeddedConnect {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the connect cluster and the embedded Kafka and Zookeeper cluster.
|
* Stop the Connect cluster and the embedded Kafka KRaft cluster.
|
||||||
* Clean up any temp directories created locally.
|
* Clean up any temp directories created locally.
|
||||||
*
|
*
|
||||||
* @throws RuntimeException if Kafka brokers fail to stop
|
* @throws RuntimeException if Kafka brokers fail to stop
|
||||||
|
|
|
@ -41,9 +41,11 @@ import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STA
|
||||||
import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
|
import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start an embedded connect cluster. Internally, this class will spin up a Kafka and Zk cluster, set up any tmp
|
* Start an embedded Connect cluster that can be used for integration tests. Internally, this class also spins up a
|
||||||
* directories, and clean them up on exit. Methods on the same {@code EmbeddedConnectCluster} are
|
* backing Kafka KRaft cluster for the Connect cluster leveraging {@link kafka.testkit.KafkaClusterTestKit}. Methods
|
||||||
* not guaranteed to be thread-safe.
|
* on the same {@code EmbeddedConnectCluster} are not guaranteed to be thread-safe. This class also provides various
|
||||||
|
* utility methods to perform actions on the Connect cluster such as connector creation, config validation, connector
|
||||||
|
* restarts, pause / resume, connector deletion etc.
|
||||||
*/
|
*/
|
||||||
public class EmbeddedConnectCluster extends EmbeddedConnect {
|
public class EmbeddedConnectCluster extends EmbeddedConnect {
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@ import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_C
|
||||||
import static org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG;
|
import static org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start a standalone embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster,
|
* Start a standalone embedded connect worker. Internally, this class will spin up a Kafka cluster,
|
||||||
* set up any tmp directories. and clean them up on exit. Methods on the same
|
* set up any tmp directories. and clean them up on exit. Methods on the same
|
||||||
* {@code EmbeddedConnectStandalone} are not guaranteed to be thread-safe.
|
* {@code EmbeddedConnectStandalone} are not guaranteed to be thread-safe.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -16,12 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.connect.util.clusters;
|
package org.apache.kafka.connect.util.clusters;
|
||||||
|
|
||||||
import kafka.cluster.EndPoint;
|
import kafka.server.BrokerServer;
|
||||||
import kafka.server.KafkaConfig;
|
import kafka.testkit.KafkaClusterTestKit;
|
||||||
import kafka.server.KafkaServer;
|
import kafka.testkit.TestKitNodes;
|
||||||
import kafka.utils.CoreUtils;
|
|
||||||
import kafka.utils.TestUtils;
|
|
||||||
import kafka.zk.EmbeddedZookeeper;
|
|
||||||
|
|
||||||
import org.apache.kafka.clients.CommonClientConfigs;
|
import org.apache.kafka.clients.CommonClientConfigs;
|
||||||
import org.apache.kafka.clients.admin.Admin;
|
import org.apache.kafka.clients.admin.Admin;
|
||||||
|
@ -49,23 +46,17 @@ import org.apache.kafka.common.config.SslConfigs;
|
||||||
import org.apache.kafka.common.config.types.Password;
|
import org.apache.kafka.common.config.types.Password;
|
||||||
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
|
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
|
||||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
|
||||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||||
import org.apache.kafka.common.utils.Time;
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.connect.errors.ConnectException;
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
|
||||||
import org.apache.kafka.metadata.BrokerState;
|
import org.apache.kafka.metadata.BrokerState;
|
||||||
import org.apache.kafka.network.SocketServerConfigs;
|
import org.apache.kafka.network.SocketServerConfigs;
|
||||||
import org.apache.kafka.server.config.ServerConfigs;
|
import org.apache.kafka.server.config.ServerLogConfigs;
|
||||||
import org.apache.kafka.server.config.ZkConfigs;
|
|
||||||
import org.apache.kafka.storage.internals.log.CleanerConfig;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -83,6 +74,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -95,98 +87,62 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_
|
||||||
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
|
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
|
||||||
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
|
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
|
||||||
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
|
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
|
||||||
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
|
|
||||||
import static org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG;
|
|
||||||
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
|
|
||||||
import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for
|
* Setup an embedded Kafka KRaft cluster (using {@link kafka.testkit.KafkaClusterTestKit} internally) with the
|
||||||
* integration tests.
|
* specified number of brokers and the specified broker properties. This can be used for integration tests and is
|
||||||
|
* typically used in conjunction with {@link EmbeddedConnectCluster}. Additional Kafka client properties can also be
|
||||||
|
* supplied if required. This class also provides various utility methods to easily create Kafka topics, produce data,
|
||||||
|
* consume data etc.
|
||||||
*/
|
*/
|
||||||
public class EmbeddedKafkaCluster {
|
public class EmbeddedKafkaCluster {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
|
private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
|
||||||
|
|
||||||
private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120);
|
private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = TimeUnit.SECONDS.toMillis(120);
|
||||||
|
|
||||||
// Kafka Config
|
private final KafkaClusterTestKit cluster;
|
||||||
private final KafkaServer[] brokers;
|
|
||||||
private final Properties brokerConfig;
|
private final Properties brokerConfig;
|
||||||
private final Time time = Time.SYSTEM;
|
private final Map<String, String> clientConfigs;
|
||||||
private final int[] currentBrokerPorts;
|
|
||||||
private final String[] currentBrokerLogDirs;
|
|
||||||
private final boolean hasListenerConfig;
|
|
||||||
|
|
||||||
final Map<String, String> clientConfigs;
|
|
||||||
|
|
||||||
private EmbeddedZookeeper zookeeper = null;
|
|
||||||
private ListenerName listenerName = new ListenerName("PLAINTEXT");
|
|
||||||
private KafkaProducer<byte[], byte[]> producer;
|
private KafkaProducer<byte[], byte[]> producer;
|
||||||
|
|
||||||
public EmbeddedKafkaCluster(final int numBrokers,
|
public EmbeddedKafkaCluster(final int numBrokers, final Properties brokerConfig) {
|
||||||
final Properties brokerConfig) {
|
|
||||||
this(numBrokers, brokerConfig, Collections.emptyMap());
|
this(numBrokers, brokerConfig, Collections.emptyMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
public EmbeddedKafkaCluster(final int numBrokers,
|
public EmbeddedKafkaCluster(final int numBrokers,
|
||||||
final Properties brokerConfig,
|
final Properties brokerConfig,
|
||||||
final Map<String, String> clientConfigs) {
|
final Map<String, String> clientConfigs) {
|
||||||
brokers = new KafkaServer[numBrokers];
|
addDefaultBrokerPropsIfAbsent(brokerConfig, numBrokers);
|
||||||
currentBrokerPorts = new int[numBrokers];
|
try {
|
||||||
currentBrokerLogDirs = new String[numBrokers];
|
KafkaClusterTestKit.Builder clusterBuilder = new KafkaClusterTestKit.Builder(
|
||||||
this.brokerConfig = brokerConfig;
|
new TestKitNodes.Builder()
|
||||||
// Since we support `stop` followed by `startOnlyKafkaOnSamePorts`, we track whether
|
.setCombined(true)
|
||||||
// a listener config is defined during initialization in order to know if it's
|
.setNumBrokerNodes(numBrokers)
|
||||||
// safe to override it
|
// Reduce number of controllers for faster startup
|
||||||
hasListenerConfig = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG) != null;
|
// We may make this configurable in the future if there's a use case for it
|
||||||
|
.setNumControllerNodes(1)
|
||||||
|
.build()
|
||||||
|
);
|
||||||
|
|
||||||
|
brokerConfig.forEach((k, v) -> clusterBuilder.setConfigProp((String) k, v));
|
||||||
|
cluster = clusterBuilder.build();
|
||||||
|
cluster.nonFatalFaultHandler().setIgnore(true);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new ConnectException("Failed to create test Kafka cluster", e);
|
||||||
|
}
|
||||||
|
this.brokerConfig = brokerConfig;
|
||||||
this.clientConfigs = clientConfigs;
|
this.clientConfigs = clientConfigs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Starts the Kafka cluster alone using the ports that were assigned during initialization of
|
|
||||||
* the harness.
|
|
||||||
*
|
|
||||||
* @throws ConnectException if a directory to store the data cannot be created
|
|
||||||
*/
|
|
||||||
public void startOnlyKafkaOnSamePorts() {
|
|
||||||
doStart();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
// pick a random port
|
try {
|
||||||
zookeeper = new EmbeddedZookeeper();
|
cluster.format();
|
||||||
Arrays.fill(currentBrokerPorts, 0);
|
cluster.startup();
|
||||||
Arrays.fill(currentBrokerLogDirs, null);
|
cluster.waitForReadyBrokers();
|
||||||
doStart();
|
} catch (Exception e) {
|
||||||
}
|
throw new ConnectException("Failed to start test Kafka cluster", e);
|
||||||
|
|
||||||
private void doStart() {
|
|
||||||
brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString());
|
|
||||||
|
|
||||||
putIfAbsent(brokerConfig, ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, true);
|
|
||||||
putIfAbsent(brokerConfig, GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
|
|
||||||
putIfAbsent(brokerConfig, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) brokers.length);
|
|
||||||
putIfAbsent(brokerConfig, AUTO_CREATE_TOPICS_ENABLE_CONFIG, false);
|
|
||||||
// reduce the size of the log cleaner map to reduce test memory usage
|
|
||||||
putIfAbsent(brokerConfig, CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
|
|
||||||
|
|
||||||
Object listenerConfig = brokerConfig.get(INTER_BROKER_LISTENER_NAME_CONFIG);
|
|
||||||
if (listenerConfig == null)
|
|
||||||
listenerConfig = brokerConfig.get(INTER_BROKER_SECURITY_PROTOCOL_CONFIG);
|
|
||||||
if (listenerConfig == null)
|
|
||||||
listenerConfig = "PLAINTEXT";
|
|
||||||
listenerName = new ListenerName(listenerConfig.toString());
|
|
||||||
|
|
||||||
for (int i = 0; i < brokers.length; i++) {
|
|
||||||
brokerConfig.put(ServerConfigs.BROKER_ID_CONFIG, i);
|
|
||||||
currentBrokerLogDirs[i] = currentBrokerLogDirs[i] == null ? createLogDir() : currentBrokerLogDirs[i];
|
|
||||||
brokerConfig.put(LOG_DIR_CONFIG, currentBrokerLogDirs[i]);
|
|
||||||
if (!hasListenerConfig)
|
|
||||||
brokerConfig.put(SocketServerConfigs.LISTENERS_CONFIG, listenerName.value() + "://localhost:" + currentBrokerPorts[i]);
|
|
||||||
brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, true), time);
|
|
||||||
currentBrokerPorts[i] = brokers[i].boundPort(listenerName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, Object> producerProps = new HashMap<>(clientConfigs);
|
Map<String, Object> producerProps = new HashMap<>(clientConfigs);
|
||||||
|
@ -199,149 +155,65 @@ public class EmbeddedKafkaCluster {
|
||||||
producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
|
producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stopOnlyKafka() {
|
/**
|
||||||
stop(false, false);
|
* Restarts the Kafka brokers. This can be called after {@link #stopOnlyBrokers()}. Note that if the Kafka brokers
|
||||||
|
* need to be listening on the same ports as earlier, the {@link #brokerConfig} should contain the
|
||||||
|
* {@link SocketServerConfigs#LISTENERS_CONFIG} property and it should use a fixed non-zero free port. Also note that this is
|
||||||
|
* only possible when {@code numBrokers} is 1.
|
||||||
|
*/
|
||||||
|
public void restartOnlyBrokers() {
|
||||||
|
cluster.brokers().values().forEach(BrokerServer::startup);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop only the Kafka brokers (and not the KRaft controllers). This can be used to test Connect's functionality
|
||||||
|
* when the backing Kafka cluster goes offline.
|
||||||
|
*/
|
||||||
|
public void stopOnlyBrokers() {
|
||||||
|
cluster.brokers().values().forEach(BrokerServer::shutdown);
|
||||||
|
cluster.brokers().values().forEach(BrokerServer::awaitShutdown);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() {
|
public void stop() {
|
||||||
stop(true, true);
|
AtomicReference<Throwable> shutdownFailure = new AtomicReference<>();
|
||||||
}
|
Utils.closeQuietly(producer, "producer for embedded Kafka cluster", shutdownFailure);
|
||||||
|
Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
|
||||||
private void stop(boolean deleteLogDirs, boolean stopZK) {
|
if (shutdownFailure.get() != null) {
|
||||||
maybeShutDownProducer();
|
throw new ConnectException("Failed to shut down producer / embedded Kafka cluster", shutdownFailure.get());
|
||||||
triggerBrokerShutdown();
|
|
||||||
awaitBrokerShutdown();
|
|
||||||
|
|
||||||
if (deleteLogDirs)
|
|
||||||
deleteLogDirs();
|
|
||||||
|
|
||||||
if (stopZK)
|
|
||||||
stopZK();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void maybeShutDownProducer() {
|
|
||||||
try {
|
|
||||||
if (producer != null) {
|
|
||||||
producer.close();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Could not shutdown producer ", e);
|
|
||||||
throw new RuntimeException("Could not shutdown producer", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void triggerBrokerShutdown() {
|
|
||||||
for (KafkaServer broker : brokers) {
|
|
||||||
try {
|
|
||||||
broker.shutdown();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
String msg = String.format("Could not shutdown broker at %s", address(broker));
|
|
||||||
log.error(msg, t);
|
|
||||||
throw new RuntimeException(msg, t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void awaitBrokerShutdown() {
|
|
||||||
for (KafkaServer broker : brokers) {
|
|
||||||
try {
|
|
||||||
broker.awaitShutdown();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
String msg = String.format("Failed while awaiting shutdown of broker at %s", address(broker));
|
|
||||||
log.error(msg, t);
|
|
||||||
throw new RuntimeException(msg, t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void deleteLogDirs() {
|
|
||||||
for (KafkaServer broker : brokers) {
|
|
||||||
try {
|
|
||||||
log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs());
|
|
||||||
CoreUtils.delete(broker.config().logDirs());
|
|
||||||
} catch (Throwable t) {
|
|
||||||
String msg = String.format("Could not clean up log dirs for broker at %s",
|
|
||||||
address(broker));
|
|
||||||
log.error(msg, t);
|
|
||||||
throw new RuntimeException(msg, t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void stopZK() {
|
|
||||||
try {
|
|
||||||
zookeeper.shutdown();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString());
|
|
||||||
log.error(msg, t);
|
|
||||||
throw new RuntimeException(msg, t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) {
|
|
||||||
if (!props.containsKey(propertyKey)) {
|
|
||||||
props.put(propertyKey, propertyValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private String createLogDir() {
|
|
||||||
try {
|
|
||||||
return Files.createTempDirectory(getClass().getSimpleName()).toString();
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("Unable to create temporary log directory", e);
|
|
||||||
throw new ConnectException("Unable to create temporary log directory", e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String bootstrapServers() {
|
public String bootstrapServers() {
|
||||||
return Arrays.stream(brokers)
|
return cluster.bootstrapServers();
|
||||||
.map(this::address)
|
|
||||||
.collect(Collectors.joining(","));
|
|
||||||
}
|
|
||||||
|
|
||||||
public String address(KafkaServer server) {
|
|
||||||
final EndPoint endPoint = server.advertisedListeners().head();
|
|
||||||
return endPoint.host() + ":" + endPoint.port();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String zKConnectString() {
|
|
||||||
return "127.0.0.1:" + zookeeper.port();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the brokers that have a {@link BrokerState#RUNNING} state.
|
* Get the brokers that have a {@link BrokerState#RUNNING} state.
|
||||||
*
|
*
|
||||||
* @return the list of {@link KafkaServer} instances that are running;
|
* @return the set of {@link BrokerServer} instances that are running;
|
||||||
* never null but possibly empty
|
* never null but possibly empty
|
||||||
*/
|
*/
|
||||||
public Set<KafkaServer> runningBrokers() {
|
public Set<BrokerServer> runningBrokers() {
|
||||||
return brokersInState(state -> state == BrokerState.RUNNING);
|
return brokersInState(BrokerState.RUNNING::equals);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the brokers whose state match the given predicate.
|
* Get the brokers whose state match the given predicate.
|
||||||
*
|
*
|
||||||
* @return the list of {@link KafkaServer} instances with states that match the predicate;
|
* @return the set of {@link BrokerServer} instances with states that match the predicate;
|
||||||
* never null but possibly empty
|
* never null but possibly empty
|
||||||
*/
|
*/
|
||||||
public Set<KafkaServer> brokersInState(Predicate<BrokerState> desiredState) {
|
public Set<BrokerServer> brokersInState(Predicate<BrokerState> desiredState) {
|
||||||
return Arrays.stream(brokers)
|
return cluster.brokers().values().stream()
|
||||||
.filter(b -> hasState(b, desiredState))
|
.filter(b -> desiredState.test(b.brokerState()))
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean hasState(KafkaServer server, Predicate<BrokerState> desiredState) {
|
|
||||||
try {
|
|
||||||
return desiredState.test(server.brokerState());
|
|
||||||
} catch (Throwable e) {
|
|
||||||
// Broker failed to respond.
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean sslEnabled() {
|
public boolean sslEnabled() {
|
||||||
final String listeners = brokerConfig.getProperty(SocketServerConfigs.LISTENERS_CONFIG);
|
final String listenerSecurityProtocolMap = brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG);
|
||||||
return listeners != null && listeners.contains("SSL");
|
if (listenerSecurityProtocolMap == null)
|
||||||
|
return false;
|
||||||
|
return listenerSecurityProtocolMap.contains(":SSL") || listenerSecurityProtocolMap.contains(":SASL_SSL");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -447,9 +319,9 @@ public class EmbeddedKafkaCluster {
|
||||||
* @param adminClientConfig Additional admin client configuration settings.
|
* @param adminClientConfig Additional admin client configuration settings.
|
||||||
*/
|
*/
|
||||||
public void createTopic(String topic, int partitions, int replication, Map<String, String> topicConfig, Map<String, Object> adminClientConfig) {
|
public void createTopic(String topic, int partitions, int replication, Map<String, String> topicConfig, Map<String, Object> adminClientConfig) {
|
||||||
if (replication > brokers.length) {
|
if (replication > cluster.brokers().size()) {
|
||||||
throw new InvalidReplicationFactorException("Insufficient brokers ("
|
throw new InvalidReplicationFactorException("Insufficient brokers ("
|
||||||
+ brokers.length + ") for desired replication (" + replication + ")");
|
+ cluster.brokers().size() + ") for desired replication (" + replication + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
|
log.info("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
|
||||||
|
@ -498,8 +370,7 @@ public class EmbeddedKafkaCluster {
|
||||||
Properties props = Utils.mkProperties(clientConfigs);
|
Properties props = Utils.mkProperties(clientConfigs);
|
||||||
props.putAll(adminClientConfig);
|
props.putAll(adminClientConfig);
|
||||||
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
||||||
final Object listeners = brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG);
|
if (sslEnabled()) {
|
||||||
if (listeners != null && listeners.toString().contains("SSL")) {
|
|
||||||
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
||||||
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
|
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
|
||||||
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
||||||
|
@ -702,16 +573,16 @@ public class EmbeddedKafkaCluster {
|
||||||
Map<String, Object> props = new HashMap<>(clientConfigs);
|
Map<String, Object> props = new HashMap<>(clientConfigs);
|
||||||
props.putAll(consumerProps);
|
props.putAll(consumerProps);
|
||||||
|
|
||||||
putIfAbsent(props, GROUP_ID_CONFIG, UUID.randomUUID().toString());
|
props.putIfAbsent(GROUP_ID_CONFIG, UUID.randomUUID().toString());
|
||||||
putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
||||||
putIfAbsent(props, ENABLE_AUTO_COMMIT_CONFIG, "false");
|
props.putIfAbsent(ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||||
putIfAbsent(props, AUTO_OFFSET_RESET_CONFIG, "earliest");
|
props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
putIfAbsent(props, KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
|
props.putIfAbsent(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
|
||||||
putIfAbsent(props, VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
|
props.putIfAbsent(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
|
||||||
if (sslEnabled()) {
|
if (sslEnabled()) {
|
||||||
putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
||||||
putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
|
props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
|
||||||
putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
||||||
}
|
}
|
||||||
KafkaConsumer<byte[], byte[]> consumer;
|
KafkaConsumer<byte[], byte[]> consumer;
|
||||||
try {
|
try {
|
||||||
|
@ -731,13 +602,13 @@ public class EmbeddedKafkaCluster {
|
||||||
public KafkaProducer<byte[], byte[]> createProducer(Map<String, Object> producerProps) {
|
public KafkaProducer<byte[], byte[]> createProducer(Map<String, Object> producerProps) {
|
||||||
Map<String, Object> props = new HashMap<>(clientConfigs);
|
Map<String, Object> props = new HashMap<>(clientConfigs);
|
||||||
props.putAll(producerProps);
|
props.putAll(producerProps);
|
||||||
putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
|
||||||
putIfAbsent(props, KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
props.putIfAbsent(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||||
putIfAbsent(props, VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
props.putIfAbsent(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||||
if (sslEnabled()) {
|
if (sslEnabled()) {
|
||||||
putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
|
||||||
putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
|
props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
|
||||||
putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
|
||||||
}
|
}
|
||||||
KafkaProducer<byte[], byte[]> producer;
|
KafkaProducer<byte[], byte[]> producer;
|
||||||
try {
|
try {
|
||||||
|
@ -748,9 +619,9 @@ public class EmbeddedKafkaCluster {
|
||||||
return producer;
|
return producer;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void putIfAbsent(final Map<String, Object> props, final String propertyKey, final Object propertyValue) {
|
private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) {
|
||||||
if (!props.containsKey(propertyKey)) {
|
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
|
||||||
props.put(propertyKey, propertyValue);
|
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, String.valueOf(numBrokers));
|
||||||
}
|
brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,14 +149,14 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
private TestKitNodes nodes;
|
private TestKitNodes nodes;
|
||||||
private Map<String, String> configProps = new HashMap<>();
|
private final Map<String, Object> configProps = new HashMap<>();
|
||||||
private SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();
|
private final SimpleFaultHandlerFactory faultHandlerFactory = new SimpleFaultHandlerFactory();
|
||||||
|
|
||||||
public Builder(TestKitNodes nodes) {
|
public Builder(TestKitNodes nodes) {
|
||||||
this.nodes = nodes;
|
this.nodes = nodes;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder setConfigProp(String key, String value) {
|
public Builder setConfigProp(String key, Object value) {
|
||||||
this.configProps.put(key, value);
|
this.configProps.put(key, value);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
@ -165,7 +165,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
BrokerNode brokerNode = nodes.brokerNodes().get(node.id());
|
BrokerNode brokerNode = nodes.brokerNodes().get(node.id());
|
||||||
ControllerNode controllerNode = nodes.controllerNodes().get(node.id());
|
ControllerNode controllerNode = nodes.controllerNodes().get(node.id());
|
||||||
|
|
||||||
Map<String, String> props = new HashMap<>(configProps);
|
Map<String, Object> props = new HashMap<>(configProps);
|
||||||
props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
|
props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
|
||||||
Long.toString(TimeUnit.MINUTES.toMillis(10)));
|
Long.toString(TimeUnit.MINUTES.toMillis(10)));
|
||||||
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, roles(node.id()));
|
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, roles(node.id()));
|
||||||
|
@ -188,13 +188,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
props.put(LOG_DIRS_CONFIG,
|
props.put(LOG_DIRS_CONFIG,
|
||||||
controllerNode.metadataDirectory());
|
controllerNode.metadataDirectory());
|
||||||
}
|
}
|
||||||
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
|
|
||||||
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
|
// We allow configuring the listeners and related properties via Builder::setConfigProp,
|
||||||
props.put(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id()));
|
// and they shouldn't be overridden here
|
||||||
props.put(INTER_BROKER_LISTENER_NAME_CONFIG,
|
props.putIfAbsent(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
|
||||||
nodes.interBrokerListenerName().value());
|
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
|
||||||
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
|
props.putIfAbsent(SocketServerConfigs.LISTENERS_CONFIG, listeners(node.id()));
|
||||||
"CONTROLLER");
|
props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG,
|
||||||
|
nodes.interBrokerListenerName().value());
|
||||||
|
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
|
||||||
|
|
||||||
// Note: we can't accurately set controller.quorum.voters yet, since we don't
|
// Note: we can't accurately set controller.quorum.voters yet, since we don't
|
||||||
// yet know what ports each controller will pick. Set it to a dummy string
|
// yet know what ports each controller will pick. Set it to a dummy string
|
||||||
// for now as a placeholder.
|
// for now as a placeholder.
|
||||||
|
@ -257,7 +260,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
nodes.bootstrapMetadata());
|
nodes.bootstrapMetadata());
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.error("Error creating controller {}", node.id(), e);
|
log.error("Error creating controller {}", node.id(), e);
|
||||||
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", () -> sharedServer.stopForController());
|
Utils.swallow(log, Level.WARN, "sharedServer.stopForController error", sharedServer::stopForController);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
controllers.put(node.id(), controller);
|
controllers.put(node.id(), controller);
|
||||||
|
@ -288,7 +291,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
|
||||||
broker = new BrokerServer(sharedServer);
|
broker = new BrokerServer(sharedServer);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.error("Error creating broker {}", node.id(), e);
|
log.error("Error creating broker {}", node.id(), e);
|
||||||
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", () -> sharedServer.stopForBroker());
|
Utils.swallow(log, Level.WARN, "sharedServer.stopForBroker error", sharedServer::stopForBroker);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
brokers.put(node.id(), broker);
|
brokers.put(node.id(), broker);
|
||||||
|
|
Loading…
Reference in New Issue