KAFKA-10000: Integration tests (#11782)

Implements embedded end-to-end integration tests for KIP-618, and brings together previously-decoupled logic from upstream PRs.

Reviewers: Luke Chen <showuon@gmail.com>, Tom Bentley <tbentley@redhat.com>, Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Chris Egerton 2022-07-05 22:35:05 -04:00 committed by GitHub
parent 448441a35d
commit 3ae1afa438
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1418 additions and 23 deletions

View File

@ -196,6 +196,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
List<SourceRecord> toSend;
protected Map<String, String> taskConfig;
protected boolean started = false;
private volatile boolean producerClosed = false;
protected AbstractWorkerSourceTask(ConnectorTaskId id,
SourceTask task,
@ -315,6 +316,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
private void closeProducer(Duration duration) {
if (producer != null) {
producerClosed = true;
Utils.closeQuietly(() -> producer.close(duration), "source task producer");
}
}
@ -397,7 +399,11 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
producerRecord,
(recordMetadata, e) -> {
if (e != null) {
log.debug("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e);
if (producerClosed) {
log.trace("{} failed to send record to {}; this is expected as the producer has already been closed", AbstractWorkerSourceTask.this, topic, e);
} else {
log.error("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e);
}
log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord);
producerSendFailed(false, producerRecord, preTransformRecord, e);
} else {

View File

@ -174,8 +174,6 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
);
commitTaskRecord(preTransformRecord, null);
} else {
log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
producerSendException.compareAndSet(null, e);
}
}

View File

@ -332,6 +332,7 @@ public class ConnectorsResource implements ConnectResource {
@PUT
@Path("/{connector}/fence")
@Operation(hidden = true, summary = "This operation is only for inter-worker communications")
public void fenceZombies(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward,

View File

@ -112,6 +112,14 @@ public class ConnectorHandle {
taskHandles.remove(taskId);
}
/**
* Delete all task handles for this connector.
*/
public void clearTasks() {
log.info("Clearing {} existing task handles for connector {}", taskHandles.size(), connectorName);
taskHandles.clear();
}
/**
* Set the number of expected records for this connector.
*

View File

@ -20,8 +20,11 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.runtime.SampleSourceConnector;
import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.tools.ThroughputThrottler;
@ -32,6 +35,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@ -47,6 +51,20 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class);
public static final String TOPIC_CONFIG = "topic";
public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll";
public static final String CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG = "custom.exactly.once.support";
public static final String EXACTLY_ONCE_SUPPORTED = "supported";
public static final String EXACTLY_ONCE_UNSUPPORTED = "unsupported";
public static final String EXACTLY_ONCE_NULL = "null";
public static final String EXACTLY_ONCE_FAIL = "fail";
public static final String CUSTOM_TRANSACTION_BOUNDARIES_CONFIG = "custom.transaction.boundaries";
public static final String TRANSACTION_BOUNDARIES_SUPPORTED = "supported";
public static final String TRANSACTION_BOUNDARIES_UNSUPPORTED = "unsupported";
public static final String TRANSACTION_BOUNDARIES_NULL = "null";
public static final String TRANSACTION_BOUNDARIES_FAIL = "fail";
private String connectorName;
private ConnectorHandle connectorHandle;
private Map<String, String> commonConfigs;
@ -74,7 +92,7 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>(commonConfigs);
config.put("connector.name", connectorName);
config.put("task.id", connectorName + "-" + i);
config.put("task.id", taskId(connectorName, i));
configs.add(config);
}
return configs;
@ -92,18 +110,55 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
return new ConfigDef();
}
@Override
public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
String supportLevel = connectorConfig.getOrDefault(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "null").toLowerCase(Locale.ROOT);
switch (supportLevel) {
case EXACTLY_ONCE_SUPPORTED:
return ExactlyOnceSupport.SUPPORTED;
case EXACTLY_ONCE_UNSUPPORTED:
return ExactlyOnceSupport.UNSUPPORTED;
case EXACTLY_ONCE_FAIL:
throw new ConnectException("oops");
default:
case EXACTLY_ONCE_NULL:
return null;
}
}
@Override
public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
String supportLevel = connectorConfig.getOrDefault(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, TRANSACTION_BOUNDARIES_UNSUPPORTED).toLowerCase(Locale.ROOT);
switch (supportLevel) {
case TRANSACTION_BOUNDARIES_SUPPORTED:
return ConnectorTransactionBoundaries.SUPPORTED;
case TRANSACTION_BOUNDARIES_FAIL:
throw new ConnectException("oh no :(");
case TRANSACTION_BOUNDARIES_NULL:
return null;
default:
case TRANSACTION_BOUNDARIES_UNSUPPORTED:
return ConnectorTransactionBoundaries.UNSUPPORTED;
}
}
public static String taskId(String connectorName, int taskId) {
return connectorName + "-" + taskId;
}
public static class MonitorableSourceTask extends SourceTask {
private String connectorName;
private String taskId;
private String topicName;
private TaskHandle taskHandle;
private volatile boolean stopped;
private long startingSeqno;
private long seqno;
private long throughput;
private int batchSize;
private ThroughputThrottler throttler;
private long priorTransactionBoundary;
private long nextTransactionBoundary;
@Override
public String version() {
return "unknown";
@ -112,21 +167,24 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
@Override
public void start(Map<String, String> props) {
taskId = props.get("task.id");
connectorName = props.get("connector.name");
String connectorName = props.get("connector.name");
topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic");
throughput = Long.parseLong(props.getOrDefault("throughput", "-1"));
batchSize = Integer.parseInt(props.getOrDefault("messages.per.poll", "1"));
batchSize = Integer.parseInt(props.getOrDefault(MESSAGES_PER_POLL_CONFIG, "1"));
taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
Map<String, Object> offset = Optional.ofNullable(
context.offsetStorageReader().offset(Collections.singletonMap("task.id", taskId)))
.orElse(Collections.emptyMap());
startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L);
seqno = startingSeqno;
log.info("Started {} task {} with properties {}", this.getClass().getSimpleName(), taskId, props);
throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
throttler = new ThroughputThrottler(Long.parseLong(props.getOrDefault("throughput", "-1")), System.currentTimeMillis());
taskHandle.recordTaskStart();
priorTransactionBoundary = 0;
nextTransactionBoundary = 1;
if (Boolean.parseBoolean(props.getOrDefault("task-" + taskId + ".start.inject.error", "false"))) {
throw new RuntimeException("Injecting errors during task start");
}
calculateNextBoundary();
}
@Override
@ -136,11 +194,13 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
throttler.throttle();
}
taskHandle.record(batchSize);
log.info("Returning batch of {} records", batchSize);
log.trace("Returning batch of {} records", batchSize);
return LongStream.range(0, batchSize)
.mapToObj(i -> new SourceRecord(
Collections.singletonMap("task.id", taskId),
Collections.singletonMap("saved", ++seqno),
.mapToObj(i -> {
seqno++;
SourceRecord record = new SourceRecord(
sourcePartition(taskId),
sourceOffset(seqno),
topicName,
null,
Schema.STRING_SCHEMA,
@ -148,7 +208,10 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
Schema.STRING_SCHEMA,
"value-" + taskId + "-" + seqno,
null,
new ConnectHeaders().addLong("header-" + seqno, seqno)))
new ConnectHeaders().addLong("header-" + seqno, seqno));
maybeDefineTransactionBoundary(record);
return record;
})
.collect(Collectors.toList());
}
return null;
@ -172,5 +235,43 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
stopped = true;
taskHandle.recordTaskStop();
}
/**
* Calculate the next transaction boundary, i.e., the seqno whose corresponding source record should be used to
* either {@link org.apache.kafka.connect.source.TransactionContext#commitTransaction(SourceRecord) commit}
* or {@link org.apache.kafka.connect.source.TransactionContext#abortTransaction(SourceRecord) abort} the next transaction.
* <p>
* This connector defines transactions whose size correspond to successive elements of the Fibonacci sequence,
* where transactions with an even number of records are aborted, and those with an odd number of records are committed.
*/
private void calculateNextBoundary() {
while (nextTransactionBoundary <= seqno) {
nextTransactionBoundary += priorTransactionBoundary;
priorTransactionBoundary = nextTransactionBoundary - priorTransactionBoundary;
}
}
private void maybeDefineTransactionBoundary(SourceRecord record) {
if (context.transactionContext() == null || seqno != nextTransactionBoundary) {
return;
}
// If the transaction boundary ends on an even-numbered offset, abort it
// Otherwise, commit
boolean abort = nextTransactionBoundary % 2 == 0;
calculateNextBoundary();
if (abort) {
context.transactionContext().abortTransaction(record);
} else {
context.transactionContext().commitTransaction(record);
}
}
}
public static Map<String, Object> sourcePartition(String taskId) {
return Collections.singletonMap("task.id", taskId);
}
public static Map<String, Object> sourceOffset(long seqno) {
return Collections.singletonMap("saved", seqno);
}
}

View File

@ -47,6 +47,9 @@ public class EmbeddedConnectClusterAssertions {
public static final long WORKER_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(5);
public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(2);
// Creating a connector requires two rounds of rebalance; destroying one only requires one
// Assume it'll take ~half the time to destroy a connector as it does to create one
public static final long CONNECTOR_SHUTDOWN_DURATION_MS = TimeUnit.MINUTES.toMillis(1);
private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
private final EmbeddedConnectCluster connect;

View File

@ -26,14 +26,18 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
@ -45,6 +49,7 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
import org.slf4j.Logger;
@ -55,9 +60,11 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -66,6 +73,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -75,6 +84,9 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMI
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.junit.Assert.assertFalse;
/**
* Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for
@ -439,9 +451,23 @@ public class EmbeddedKafkaCluster {
* @return a {@link ConsumerRecords} collection containing at least n records.
*/
public ConsumerRecords<byte[], byte[]> consume(int n, long maxDuration, String... topics) {
return consume(n, maxDuration, Collections.emptyMap(), topics);
}
/**
* Consume at least n records in a given duration or throw an exception.
*
* @param n the number of expected records in this topic.
* @param maxDuration the max duration to wait for these records (in milliseconds).
* @param topics the topics to subscribe and consume records from.
* @param consumerProps overrides to the default properties the consumer is constructed with;
* may not be null
* @return a {@link ConsumerRecords} collection containing at least n records.
*/
public ConsumerRecords<byte[], byte[]> consume(int n, long maxDuration, Map<String, Object> consumerProps, String... topics) {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
int consumedRecords = 0;
try (KafkaConsumer<byte[], byte[]> consumer = createConsumerAndSubscribeTo(Collections.emptyMap(), topics)) {
try (KafkaConsumer<byte[], byte[]> consumer = createConsumerAndSubscribeTo(consumerProps, topics)) {
final long startMillis = System.currentTimeMillis();
long allowedDuration = maxDuration;
while (allowedDuration > 0) {
@ -466,6 +492,108 @@ public class EmbeddedKafkaCluster {
throw new RuntimeException("Could not find enough records. found " + consumedRecords + ", expected " + n);
}
/**
* Consume all currently-available records for the specified topics in a given duration, or throw an exception.
* @param maxDurationMs the max duration to wait for these records (in milliseconds).
* @param consumerProps overrides to the default properties the consumer is constructed with; may be null
* @param adminProps overrides to the default properties the admin used to query Kafka cluster metadata is constructed with; may be null
* @param topics the topics to consume from
* @return a {@link ConsumerRecords} collection containing the records for all partitions of the given topics
*/
public ConsumerRecords<byte[], byte[]> consumeAll(
long maxDurationMs,
Map<String, Object> consumerProps,
Map<String, Object> adminProps,
String... topics
) throws TimeoutException, InterruptedException, ExecutionException {
long endTimeMs = System.currentTimeMillis() + maxDurationMs;
Consumer<byte[], byte[]> consumer = createConsumer(consumerProps != null ? consumerProps : Collections.emptyMap());
Admin admin = createAdminClient(Utils.mkObjectProperties(adminProps != null ? adminProps : Collections.emptyMap()));
long remainingTimeMs = endTimeMs - System.currentTimeMillis();
Set<TopicPartition> topicPartitions = listPartitions(remainingTimeMs, admin, Arrays.asList(topics));
remainingTimeMs = endTimeMs - System.currentTimeMillis();
Map<TopicPartition, Long> endOffsets = readEndOffsets(remainingTimeMs, admin, topicPartitions);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = topicPartitions.stream()
.collect(Collectors.toMap(
Function.identity(),
tp -> new ArrayList<>()
));
consumer.assign(topicPartitions);
while (!endOffsets.isEmpty()) {
Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TopicPartition, Long> entry = it.next();
TopicPartition topicPartition = entry.getKey();
long endOffset = entry.getValue();
long lastConsumedOffset = consumer.position(topicPartition);
if (lastConsumedOffset >= endOffset) {
// We've reached the end offset for the topic partition; can stop polling it now
it.remove();
} else {
remainingTimeMs = endTimeMs - System.currentTimeMillis();
if (remainingTimeMs <= 0) {
throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(topics) + " within " + maxDurationMs + "ms");
}
// We haven't reached the end offset yet; need to keep polling
ConsumerRecords<byte[], byte[]> recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs));
recordBatch.partitions().forEach(tp -> records.get(tp)
.addAll(recordBatch.records(tp))
);
}
}
}
return new ConsumerRecords<>(records);
}
/**
* List all the known partitions for the given {@link Collection} of topics
* @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds).
* @param admin the admin client to use for fetching metadata from the Kafka cluster
* @param topics the topics whose partitions should be listed
* @return a {@link Set} of {@link TopicPartition topic partitions} for the given topics; never null, and never empty
*/
private Set<TopicPartition> listPartitions(
long maxDurationMs,
Admin admin,
Collection<String> topics
) throws TimeoutException, InterruptedException, ExecutionException {
assertFalse("collection of topics may not be empty", topics.isEmpty());
return admin.describeTopics(topics)
.allTopicNames().get(maxDurationMs, TimeUnit.MILLISECONDS)
.entrySet().stream()
.flatMap(e -> e.getValue().partitions().stream().map(p -> new TopicPartition(e.getKey(), p.partition())))
.collect(Collectors.toSet());
}
/**
* List the latest current offsets for the given {@link Collection} of {@link TopicPartition topic partitions}
* @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds)
* @param admin the admin client to use for fetching metadata from the Kafka cluster
* @param topicPartitions the topic partitions to list end offsets for
* @return a {@link Map} containing the latest offset for each requested {@link TopicPartition topic partition}; never null, and never empty
*/
private Map<TopicPartition, Long> readEndOffsets(
long maxDurationMs,
Admin admin,
Collection<TopicPartition> topicPartitions
) throws TimeoutException, InterruptedException, ExecutionException {
assertFalse("collection of topic partitions may not be empty", topicPartitions.isEmpty());
Map<TopicPartition, OffsetSpec> offsetSpecMap = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
return admin.listOffsets(offsetSpecMap, new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED))
.all().get(maxDurationMs, TimeUnit.MILLISECONDS)
.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().offset()
));
}
public KafkaConsumer<byte[], byte[]> createConsumer(Map<String, Object> consumerProps) {
Map<String, Object> props = new HashMap<>(consumerProps);
@ -495,6 +623,26 @@ public class EmbeddedKafkaCluster {
return consumer;
}
public KafkaProducer<byte[], byte[]> createProducer(Map<String, Object> producerProps) {
Map<String, Object> props = new HashMap<>(producerProps);
putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
putIfAbsent(props, KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
putIfAbsent(props, VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
if (sslEnabled()) {
putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
}
KafkaProducer<byte[], byte[]> producer;
try {
producer = new KafkaProducer<>(props);
} catch (Throwable t) {
throw new ConnectException("Failed to create producer", t);
}
return producer;
}
private static void putIfAbsent(final Map<String, Object> props, final String propertyKey, final Object propertyValue) {
if (!props.containsKey(propertyKey)) {
props.put(propertyKey, propertyValue);