diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 2a93275083d..3bc5f4dbcb7 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -249,7 +249,7 @@ files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/> + files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/> diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java deleted file mode 100644 index 80a46b3437e..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.tests; - -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer; -import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Exit; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; -import org.apache.kafka.streams.internals.ConsumerWrapper; -import org.apache.kafka.streams.kstream.KGroupedStream; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Produced; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -public class EosTestClient extends SmokeTestUtil { - - static final String APP_ID = "EosTest"; - private final Properties properties; - private final boolean withRepartitioning; - private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false); - private static final List CAPTURING_CONSUMER_WRAPPERS = new ArrayList<>(); - private int minGroupEpoch = 0; - - private KafkaStreams streams; - private boolean uncaughtException; - - EosTestClient(final Properties properties, final boolean withRepartitioning) { - super(); - this.properties = properties; - this.withRepartitioning = withRepartitioning; - this.properties.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, CapturingConsumerWrapper.class); - CAPTURING_CONSUMER_WRAPPERS.clear(); - } - - private volatile boolean isRunning = true; - - public void start() { - Exit.addShutdownHook("streams-shutdown-hook", () -> { - isRunning = false; - streams.close(Duration.ofSeconds(300)); - - // need to wait for callback to avoid race condition - // -> make sure the callback printout to stdout is there as it is expected test output - waitForStateTransitionCallback(); - - // do not remove these printouts since they are needed for health scripts - if (!uncaughtException) { - System.out.println(System.currentTimeMillis()); - System.out.println("EOS-TEST-CLIENT-CLOSED"); - System.out.flush(); - } - }); - - while (isRunning) { - if (streams == null) { - uncaughtException = false; - - streams = createKafkaStreams(properties); - streams.setUncaughtExceptionHandler(e -> { - System.out.println(System.currentTimeMillis()); - System.out.println("EOS-TEST-CLIENT-EXCEPTION"); - e.printStackTrace(); - System.out.flush(); - uncaughtException = true; - return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - }); - streams.setStateListener((newState, oldState) -> { - // don't remove this -- it's required test output - System.out.println(System.currentTimeMillis()); - System.out.println("StateChange: " + oldState + " -> " + newState); - System.out.flush(); - if (newState == KafkaStreams.State.NOT_RUNNING) { - notRunningCallbackReceived.set(true); - } - }); - streams.start(); - } - if (uncaughtException) { - streams.close(Duration.ofSeconds(60_000L)); - streams = null; - } - logGroupEpochBump(); - sleep(100); - } - } - - private KafkaStreams createKafkaStreams(final Properties props) { - props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); - props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); - props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); - props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, Duration.ofMinutes(1).toMillis()); - props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, Integer.MAX_VALUE); - props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); - props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - - final StreamsBuilder builder = new StreamsBuilder(); - final KStream data = builder.stream("data"); - - data.to("echo"); - data.process(SmokeTestUtil.printProcessorSupplier("data")); - - final KGroupedStream groupedData = data.groupByKey(); - // min - groupedData - .aggregate( - () -> Integer.MAX_VALUE, - (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, - Materialized.with(null, intSerde)) - .toStream() - .to("min", Produced.with(stringSerde, intSerde)); - - // sum - groupedData.aggregate( - () -> 0L, - (aggKey, value, aggregate) -> (long) value + aggregate, - Materialized.with(null, longSerde)) - .toStream() - .to("sum", Produced.with(stringSerde, longSerde)); - - if (withRepartitioning) { - data.to("repartition"); - final KStream repartitionedData = builder.stream("repartition"); - - repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition")); - - final KGroupedStream groupedDataAfterRepartitioning = repartitionedData.groupByKey(); - // max - groupedDataAfterRepartitioning - .aggregate( - () -> Integer.MIN_VALUE, - (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, - Materialized.with(null, intSerde)) - .toStream() - .to("max", Produced.with(stringSerde, intSerde)); - - // count - groupedDataAfterRepartitioning.count() - .toStream() - .to("cnt", Produced.with(stringSerde, longSerde)); - } - - return new KafkaStreams(builder.build(), props); - } - - private void waitForStateTransitionCallback() { - final long maxWaitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(300); - while (!notRunningCallbackReceived.get() && System.currentTimeMillis() < maxWaitTime) { - try { - Thread.sleep(500); - } catch (final InterruptedException ignoreAndSwallow) { /* just keep waiting */ } - } - if (!notRunningCallbackReceived.get()) { - System.err.println("State transition callback to NOT_RUNNING never received. Timed out after 5 minutes."); - System.err.flush(); - } - } - - // Used in the streams group protocol - // Detect a completed rebalance by checking if the group epoch has been bumped for all threads. - private void logGroupEpochBump() { - int currentMin = Integer.MAX_VALUE; - for (final CapturingConsumerWrapper consumer : CAPTURING_CONSUMER_WRAPPERS) { - final int groupEpoch = consumer.lastSeenGroupEpoch; - if (groupEpoch < currentMin) { - currentMin = groupEpoch; - } - } - if (currentMin > minGroupEpoch) { - System.out.println("MemberEpochBump"); - } - if (currentMin != Integer.MAX_VALUE) { - minGroupEpoch = currentMin; - } - } - - public static class CapturingConsumerWrapper extends ConsumerWrapper { - - public volatile int lastSeenGroupEpoch = 0; - - @Override - public void wrapConsumer(final AsyncKafkaConsumer delegate, final Map config, final Optional streamsRebalanceData) { - CAPTURING_CONSUMER_WRAPPERS.add(this); - super.wrapConsumer(delegate, config, streamsRebalanceData); - } - - @Override - public ConsumerGroupMetadata groupMetadata() { - final ConsumerGroupMetadata consumerGroupMetadata = delegate.groupMetadata(); - lastSeenGroupEpoch = consumerGroupMetadata.generationId(); - return consumerGroupMetadata; - } - } - -} diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java deleted file mode 100644 index 0815c49db07..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ /dev/null @@ -1,678 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.tests; - -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.ConsumerGroupDescription; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; -import org.apache.kafka.clients.admin.StreamsGroupDescription; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.IsolationLevel; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Exit; -import org.apache.kafka.common.utils.Utils; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -public class EosTestDriver extends SmokeTestUtil { - - private static final int MAX_NUMBER_OF_KEYS = 20000; - private static final long MAX_IDLE_TIME_MS = 600000L; - - private static volatile boolean isRunning = true; - private static final CountDownLatch TERMINATED = new CountDownLatch(1); - - private static int numRecordsProduced = 0; - - private static synchronized void updateNumRecordsProduces(final int delta) { - numRecordsProduced += delta; - } - - static void generate(final String kafka) { - Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> { - System.out.println("Terminating"); - isRunning = false; - - try { - if (TERMINATED.await(5L, TimeUnit.MINUTES)) { - System.out.println("Terminated"); - } else { - System.out.println("Terminated with timeout"); - } - } catch (final InterruptedException swallow) { - swallow.printStackTrace(System.err); - System.out.println("Terminated with error"); - } - System.err.flush(); - System.out.flush(); - }); - - final Properties producerProps = new Properties(); - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - - final Map> offsets = new HashMap<>(); - - try { - try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { - final Random rand = new Random(System.currentTimeMillis()); - - while (isRunning) { - final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS); - final int value = rand.nextInt(10000); - - final ProducerRecord record = new ProducerRecord<>("data", key, value); - - producer.send(record, (metadata, exception) -> { - if (exception != null) { - exception.printStackTrace(System.err); - System.err.flush(); - if (exception instanceof TimeoutException) { - try { - // message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time - final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]); - updateNumRecordsProduces(-expired); - } catch (final Exception ignore) { - } - } - } else { - offsets.getOrDefault(metadata.partition(), new LinkedList<>()).add(metadata.offset()); - } - }); - - updateNumRecordsProduces(1); - if (numRecordsProduced % 1000 == 0) { - System.out.println(numRecordsProduced + " records produced"); - System.out.flush(); - } - Utils.sleep(rand.nextInt(10)); - } - } - System.out.println("Producer closed: " + numRecordsProduced + " records produced"); - System.out.flush(); - - // verify offsets - for (final Map.Entry> offsetsOfPartition : offsets.entrySet()) { - offsetsOfPartition.getValue().sort(Long::compareTo); - for (int i = 0; i < offsetsOfPartition.getValue().size() - 1; ++i) { - if (offsetsOfPartition.getValue().get(i) != i) { - System.err.println("Offset for partition " + offsetsOfPartition.getKey() + " is not " + i + " as expected but " + offsetsOfPartition.getValue().get(i)); - System.err.flush(); - } - } - System.out.println("Max offset of partition " + offsetsOfPartition.getKey() + " is " + offsetsOfPartition.getValue().get(offsetsOfPartition.getValue().size() - 1)); - } - - final Properties props = new Properties(); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); - - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, "data"); - System.out.println("Partitions: " + partitions); - System.out.flush(); - consumer.assign(partitions); - consumer.seekToEnd(partitions); - - for (final TopicPartition tp : partitions) { - System.out.println("End-offset for " + tp + " is " + consumer.position(tp)); - System.out.flush(); - } - } - System.out.flush(); - } finally { - TERMINATED.countDown(); - } - } - - public static void verify(final String kafka, final boolean withRepartitioning, final String groupProtocol) { - final Properties props = new Properties(); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); - - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - verifyAllTransactionFinished(consumer, kafka, withRepartitioning); - } catch (final Exception e) { - e.printStackTrace(System.err); - System.out.println("FAILED"); - return; - } - - final Map committedOffsets; - try (final Admin adminClient = Admin.create(props)) { - ensureStreamsApplicationDown(adminClient, groupProtocol); - - committedOffsets = getCommittedOffsets(adminClient, withRepartitioning); - } - - final String[] allInputTopics; - final String[] allOutputTopics; - if (withRepartitioning) { - allInputTopics = new String[] {"data", "repartition"}; - allOutputTopics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"}; - } else { - allInputTopics = new String[] {"data"}; - allOutputTopics = new String[] {"echo", "min", "sum"}; - } - - final Map>>> inputRecordsPerTopicPerPartition; - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, allInputTopics); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); - - inputRecordsPerTopicPerPartition = getRecords(consumer, committedOffsets, withRepartitioning, true); - } catch (final Exception e) { - e.printStackTrace(System.err); - System.out.println("FAILED"); - return; - } - - final Map>>> outputRecordsPerTopicPerPartition; - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, allOutputTopics); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); - - outputRecordsPerTopicPerPartition = getRecords(consumer, consumer.endOffsets(partitions), withRepartitioning, false); - } catch (final Exception e) { - e.printStackTrace(System.err); - System.out.println("FAILED"); - return; - } - - verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("echo")); - if (withRepartitioning) { - verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("repartition")); - } - - verifyMin(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("min")); - verifySum(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("sum")); - - if (withRepartitioning) { - verifyMax(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("max")); - verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("cnt")); - } - - // do not modify: required test output - System.out.println("ALL-RECORDS-DELIVERED"); - System.out.flush(); - } - - private static void ensureStreamsApplicationDown(final Admin adminClient, final String groupProtocol) { - final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - boolean isEmpty; - do { - if (Objects.equals(groupProtocol, "streams")) { - final StreamsGroupDescription description = getStreamsGroupDescription(adminClient); - isEmpty = description.members().isEmpty(); - if (System.currentTimeMillis() > maxWaitTime && !isEmpty) { - throwNotDownException(description); - } - } else { - final ConsumerGroupDescription description = getConsumerGroupDescription(adminClient); - isEmpty = description.members().isEmpty(); - if (System.currentTimeMillis() > maxWaitTime && !isEmpty) { - throwNotDownException(description); - } - } - sleep(1000L); - } while (!isEmpty); - } - - private static void throwNotDownException(final Object description) { - throw new RuntimeException( - "Streams application not down after " + MAX_IDLE_TIME_MS / 1000L + " seconds. " + - "Group: " + description - ); - } - - private static Map getCommittedOffsets(final Admin adminClient, - final boolean withRepartitioning) { - final Map topicPartitionOffsetAndMetadataMap; - - try { - final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID); - topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); - } catch (final Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - - final Map committedOffsets = new HashMap<>(); - - for (final Map.Entry entry : topicPartitionOffsetAndMetadataMap.entrySet()) { - final String topic = entry.getKey().topic(); - if (topic.equals("data") || withRepartitioning && topic.equals("repartition")) { - committedOffsets.put(entry.getKey(), entry.getValue().offset()); - } - } - - return committedOffsets; - } - - private static Map>>> getRecords(final KafkaConsumer consumer, - final Map readEndOffsets, - final boolean withRepartitioning, - final boolean isInputTopic) { - System.out.println("read end offset: " + readEndOffsets); - final Map>>> recordPerTopicPerPartition = new HashMap<>(); - final Map maxReceivedOffsetPerPartition = new HashMap<>(); - final Map maxConsumerPositionPerPartition = new HashMap<>(); - - long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - boolean allRecordsReceived = false; - while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) { - final ConsumerRecords receivedRecords = consumer.poll(Duration.ofSeconds(1L)); - - for (final ConsumerRecord record : receivedRecords) { - maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - final TopicPartition tp = new TopicPartition(record.topic(), record.partition()); - maxReceivedOffsetPerPartition.put(tp, record.offset()); - final long readEndOffset = readEndOffsets.get(tp); - if (record.offset() < readEndOffset) { - addRecord(record, recordPerTopicPerPartition, withRepartitioning); - } else if (!isInputTopic) { - throw new RuntimeException("FAIL: did receive more records than expected for " + tp - + " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset()); - } - } - - for (final TopicPartition tp : readEndOffsets.keySet()) { - maxConsumerPositionPerPartition.put(tp, consumer.position(tp)); - if (consumer.position(tp) >= readEndOffsets.get(tp)) { - consumer.pause(Collections.singletonList(tp)); - } - } - - allRecordsReceived = consumer.paused().size() == readEndOffsets.keySet().size(); - } - - if (!allRecordsReceived) { - System.err.println("Pause partitions (ie, received all data): " + consumer.paused()); - System.err.println("Max received offset per partition: " + maxReceivedOffsetPerPartition); - System.err.println("Max consumer position per partition: " + maxConsumerPositionPerPartition); - throw new RuntimeException("FAIL: did not receive all records after " + (MAX_IDLE_TIME_MS / 1000L) + " sec idle time."); - } - - return recordPerTopicPerPartition; - } - - private static void addRecord(final ConsumerRecord record, - final Map>>> recordPerTopicPerPartition, - final boolean withRepartitioning) { - - final String topic = record.topic(); - final TopicPartition partition = new TopicPartition(topic, record.partition()); - - if (verifyTopic(topic, withRepartitioning)) { - final Map>> topicRecordsPerPartition = - recordPerTopicPerPartition.computeIfAbsent(topic, k -> new HashMap<>()); - - final List> records = - topicRecordsPerPartition.computeIfAbsent(partition, k -> new ArrayList<>()); - - records.add(record); - } else { - throw new RuntimeException("FAIL: received data from unexpected topic: " + record); - } - } - - private static boolean verifyTopic(final String topic, - final boolean withRepartitioning) { - final boolean validTopic = "data".equals(topic) || "echo".equals(topic) || "min".equals(topic) || "sum".equals(topic); - - if (withRepartitioning) { - return validTopic || "repartition".equals(topic) || "max".equals(topic) || "cnt".equals(topic); - } - - return validTopic; - } - - private static void verifyReceivedAllRecords(final Map>> expectedRecords, - final Map>> receivedRecords) { - if (expectedRecords.size() != receivedRecords.size()) { - throw new RuntimeException("Result verification failed. Received " + receivedRecords.size() + " records but expected " + expectedRecords.size()); - } - - final StringDeserializer stringDeserializer = new StringDeserializer(); - final IntegerDeserializer integerDeserializer = new IntegerDeserializer(); - for (final Map.Entry>> partitionRecords : receivedRecords.entrySet()) { - final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition()); - final List> receivedRecordsForPartition = partitionRecords.getValue(); - final List> expectedRecordsForPartition = expectedRecords.get(inputTopicPartition); - - System.out.println(partitionRecords.getKey() + " with " + receivedRecordsForPartition.size() + ", " + - inputTopicPartition + " with " + expectedRecordsForPartition.size()); - - final Iterator> expectedRecord = expectedRecordsForPartition.iterator(); - RuntimeException exception = null; - for (final ConsumerRecord receivedRecord : receivedRecordsForPartition) { - if (!expectedRecord.hasNext()) { - exception = new RuntimeException("Result verification failed for " + receivedRecord + " since there's no more expected record"); - } - - final ConsumerRecord expected = expectedRecord.next(); - - final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); - final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value()); - final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key()); - final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value()); - - if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) { - exception = new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">"); - } - } - - if (exception != null) { - throw exception; - } - } - } - - private static void verifyMin(final Map>> inputPerTopicPerPartition, - final Map>> minPerTopicPerPartition) { - final StringDeserializer stringDeserializer = new StringDeserializer(); - final IntegerDeserializer integerDeserializer = new IntegerDeserializer(); - - final HashMap currentMinPerKey = new HashMap<>(); - for (final Map.Entry>> partitionRecords : minPerTopicPerPartition.entrySet()) { - final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition()); - final List> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); - final List> partitionMin = partitionRecords.getValue(); - - if (partitionInput.size() != partitionMin.size()) { - throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " - + partitionRecords.getKey() + " but received " + partitionMin.size()); - } - - final Iterator> inputRecords = partitionInput.iterator(); - - for (final ConsumerRecord receivedRecord : partitionMin) { - final ConsumerRecord input = inputRecords.next(); - - final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); - final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value()); - final String key = stringDeserializer.deserialize(input.topic(), input.key()); - final int value = integerDeserializer.deserialize(input.topic(), input.value()); - - Integer min = currentMinPerKey.get(key); - if (min == null) { - min = value; - } else { - min = Math.min(min, value); - } - currentMinPerKey.put(key, min); - - if (!receivedKey.equals(key) || receivedValue != min) { - throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">"); - } - } - } - } - - private static void verifySum(final Map>> inputPerTopicPerPartition, - final Map>> minPerTopicPerPartition) { - final StringDeserializer stringDeserializer = new StringDeserializer(); - final IntegerDeserializer integerDeserializer = new IntegerDeserializer(); - final LongDeserializer longDeserializer = new LongDeserializer(); - - final HashMap currentSumPerKey = new HashMap<>(); - for (final Map.Entry>> partitionRecords : minPerTopicPerPartition.entrySet()) { - final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition()); - final List> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); - final List> partitionSum = partitionRecords.getValue(); - - if (partitionInput.size() != partitionSum.size()) { - throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " - + partitionRecords.getKey() + " but received " + partitionSum.size()); - } - - final Iterator> inputRecords = partitionInput.iterator(); - - for (final ConsumerRecord receivedRecord : partitionSum) { - final ConsumerRecord input = inputRecords.next(); - - final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); - final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value()); - final String key = stringDeserializer.deserialize(input.topic(), input.key()); - final int value = integerDeserializer.deserialize(input.topic(), input.value()); - - Long sum = currentSumPerKey.get(key); - if (sum == null) { - sum = (long) value; - } else { - sum += value; - } - currentSumPerKey.put(key, sum); - - if (!receivedKey.equals(key) || receivedValue != sum) { - throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">"); - } - } - } - } - - private static void verifyMax(final Map>> inputPerTopicPerPartition, - final Map>> maxPerTopicPerPartition) { - final StringDeserializer stringDeserializer = new StringDeserializer(); - final IntegerDeserializer integerDeserializer = new IntegerDeserializer(); - - final HashMap currentMinPerKey = new HashMap<>(); - for (final Map.Entry>> partitionRecords : maxPerTopicPerPartition.entrySet()) { - final TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition()); - final List> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); - final List> partitionMax = partitionRecords.getValue(); - - if (partitionInput.size() != partitionMax.size()) { - throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " - + partitionRecords.getKey() + " but received " + partitionMax.size()); - } - - final Iterator> inputRecords = partitionInput.iterator(); - - for (final ConsumerRecord receivedRecord : partitionMax) { - final ConsumerRecord input = inputRecords.next(); - - final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); - final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value()); - final String key = stringDeserializer.deserialize(input.topic(), input.key()); - final int value = integerDeserializer.deserialize(input.topic(), input.value()); - - - Integer max = currentMinPerKey.get(key); - if (max == null) { - max = Integer.MIN_VALUE; - } - max = Math.max(max, value); - currentMinPerKey.put(key, max); - - if (!receivedKey.equals(key) || receivedValue != max) { - throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">"); - } - } - } - } - - private static void verifyCnt(final Map>> inputPerTopicPerPartition, - final Map>> cntPerTopicPerPartition) { - final StringDeserializer stringDeserializer = new StringDeserializer(); - final LongDeserializer longDeserializer = new LongDeserializer(); - - final HashMap currentSumPerKey = new HashMap<>(); - for (final Map.Entry>> partitionRecords : cntPerTopicPerPartition.entrySet()) { - final TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition()); - final List> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition); - final List> partitionCnt = partitionRecords.getValue(); - - if (partitionInput.size() != partitionCnt.size()) { - throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for " - + partitionRecords.getKey() + " but received " + partitionCnt.size()); - } - - final Iterator> inputRecords = partitionInput.iterator(); - - for (final ConsumerRecord receivedRecord : partitionCnt) { - final ConsumerRecord input = inputRecords.next(); - - final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key()); - final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value()); - final String key = stringDeserializer.deserialize(input.topic(), input.key()); - - Long cnt = currentSumPerKey.get(key); - if (cnt == null) { - cnt = 0L; - } - currentSumPerKey.put(key, ++cnt); - - if (!receivedKey.equals(key) || receivedValue != cnt) { - throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">"); - } - } - } - } - - private static void verifyAllTransactionFinished(final KafkaConsumer consumer, - final String kafka, - final boolean withRepartitioning) { - final String[] topics; - if (withRepartitioning) { - topics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"}; - } else { - topics = new String[] {"echo", "min", "sum"}; - } - - final List partitions = getAllPartitions(consumer, topics); - consumer.assign(partitions); - consumer.seekToEnd(partitions); - for (final TopicPartition tp : partitions) { - System.out.println(tp + " at position " + consumer.position(tp)); - } - - final Properties consumerProps = new Properties(); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-uncommitted"); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - - - final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - try (final KafkaConsumer consumerUncommitted = new KafkaConsumer<>(consumerProps)) { - while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { - consumer.seekToEnd(partitions); - final Map topicEndOffsets = consumerUncommitted.endOffsets(partitions); - - final Iterator iterator = partitions.iterator(); - while (iterator.hasNext()) { - final TopicPartition topicPartition = iterator.next(); - final long position = consumer.position(topicPartition); - - if (position == topicEndOffsets.get(topicPartition)) { - iterator.remove(); - System.out.println("Removing " + topicPartition + " at position " + position); - } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) { - throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); - } else { - System.out.println("Retry " + topicPartition + " at position " + position); - } - } - sleep(1000L); - } - } - - if (!partitions.isEmpty()) { - throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec."); - } - } - - private static List getAllPartitions(final KafkaConsumer consumer, - final String... topics) { - final ArrayList partitions = new ArrayList<>(); - - for (final String topic : topics) { - for (final PartitionInfo info : consumer.partitionsFor(topic)) { - partitions.add(new TopicPartition(info.topic(), info.partition())); - } - } - return partitions; - } - - private static ConsumerGroupDescription getConsumerGroupDescription(final Admin adminClient) { - final ConsumerGroupDescription description; - try { - description = adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID)) - .describedGroups() - .get(EosTestClient.APP_ID) - .get(10, TimeUnit.SECONDS); - } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { - e.printStackTrace(); - throw new RuntimeException("Unexpected Exception getting group description", e); - } - return description; - } - - private static StreamsGroupDescription getStreamsGroupDescription(final Admin adminClient) { - final StreamsGroupDescription description; - try { - description = adminClient.describeStreamsGroups(Collections.singleton(EosTestClient.APP_ID)) - .describedGroups() - .get(EosTestClient.APP_ID) - .get(10, TimeUnit.SECONDS); - } catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) { - e.printStackTrace(); - throw new RuntimeException("Unexpected Exception getting group description", e); - } - return description; - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 2f9dee745d1..047f8b14794 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -381,25 +381,25 @@ public class SmokeTestDriver extends SmokeTestUtil { final int maxRecordsPerKey, final boolean eosEnabled) { final Properties props = createConsumerProperties(kafka); - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); + final KafkaConsumer consumer = new KafkaConsumer<>(props); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); - final int recordsGenerated = inputs.size() * maxRecordsPerKey; - final RecordProcessingState state = new RecordProcessingState(recordsGenerated); - final Map>>> events = new HashMap<>(); + final int recordsGenerated = inputs.size() * maxRecordsPerKey; + final RecordProcessingState state = new RecordProcessingState(recordsGenerated); + final Map>>> events = new HashMap<>(); - final long start = System.currentTimeMillis(); - final VerificationResult verificationResult = consumeAndProcessRecords(consumer, inputs, events, state, start, eosEnabled); + final long start = System.currentTimeMillis(); + final VerificationResult verificationResult = consumeAndProcessRecords(consumer, inputs, events, state, start, eosEnabled); + consumer.close(); - final VerificationResult eosResult = performEosVerification(eosEnabled, kafka); - if (!eosResult.passed()) { - return eosResult; - } - - return validateAndReportResults(inputs, events, state, verificationResult, start, eosEnabled); + final VerificationResult eosResult = performEosVerification(eosEnabled, kafka); + if (!eosResult.passed()) { + return eosResult; } + + return validateAndReportResults(inputs, events, state, verificationResult, start, eosEnabled); } private static Properties createConsumerProperties(final String kafka) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java deleted file mode 100644 index 9bb57e286d3..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.tests; - -import org.apache.kafka.common.utils.Exit; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamsConfig; - -import java.io.IOException; -import java.util.Properties; - -public class StreamsEosTest { - - /** - * args ::= kafka propFileName command - * command := "run" | "process" | "verify" - */ - public static void main(final String[] args) throws IOException { - if (args.length < 2) { - System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter"); - Exit.exit(1); - } - - final String propFileName = args[0]; - final String command = args[1]; - - final Properties streamsProperties = Utils.loadProps(propFileName); - final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); - final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG); - - if (kafka == null) { - System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); - Exit.exit(1); - } - - if ("process".equals(command) || "process-complex".equals(command)) { - if (!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) { - - System.err.println("processingGuarantee must be " + StreamsConfig.EXACTLY_ONCE_V2); - Exit.exit(1); - } - } - - System.out.println("StreamsTest instance started"); - System.out.println("kafka=" + kafka); - System.out.println("props=" + streamsProperties); - System.out.println("command=" + command); - System.out.flush(); - - if (command == null || propFileName == null) { - Exit.exit(-1); - } - - switch (command) { - case "run": - EosTestDriver.generate(kafka); - break; - case "process": - new EosTestClient(streamsProperties, false).start(); - break; - case "process-complex": - new EosTestClient(streamsProperties, true).start(); - break; - case "verify": - EosTestDriver.verify(kafka, false, streamsProperties.getProperty("group.protocol")); - break; - case "verify-complex": - EosTestDriver.verify(kafka, true, streamsProperties.getProperty("group.protocol")); - break; - default: - System.out.println("unknown command: " + command); - System.out.flush(); - Exit.exit(-1); - } - } - -} diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 3a35792a387..734ed8a3fd1 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -443,27 +443,6 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService): def __init__(self, test_context, kafka, processing_guarantee, group_protocol = 'classic', num_threads = 3, replication_factor = 3): super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, group_protocol, num_threads, replication_factor) -class StreamsEosTestDriverService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka): - super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run", "classic") - -class StreamsEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka, group_protocol): - super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process", group_protocol) - -class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka, group_protocol): - super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex", group_protocol) - -class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka, group_protocol): - super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify", group_protocol) - - -class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService): - def __init__(self, test_context, kafka, group_protocol): - super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex", group_protocol) - class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService): def __init__(self, test_context, kafka): diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py b/tests/kafkatest/tests/streams/streams_eos_test.py deleted file mode 100644 index fcab8adfe91..00000000000 --- a/tests/kafkatest/tests/streams/streams_eos_test.py +++ /dev/null @@ -1,183 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ducktape.mark import matrix -from ducktape.mark.resource import cluster -from kafkatest.services.kafka import quorum -from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \ - StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService -from kafkatest.tests.streams.base_streams_test import BaseStreamsTest - -class StreamsEosTest(BaseStreamsTest): - """ - Test of Kafka Streams exactly-once semantics - """ - - def __init__(self, test_context): - super(StreamsEosTest, self).__init__(test_context, num_controllers=1, num_brokers=3, topics={ - 'data': {'partitions': 5, 'replication-factor': 2}, - 'echo': {'partitions': 5, 'replication-factor': 2}, - 'min': {'partitions': 5, 'replication-factor': 2}, - 'sum': {'partitions': 5, 'replication-factor': 2}, - 'repartition': {'partitions': 5, 'replication-factor': 2}, - 'max': {'partitions': 5, 'replication-factor': 2}, - 'cnt': {'partitions': 5, 'replication-factor': 2} - }) - self.driver = StreamsEosTestDriverService(test_context, self.kafka) - self.test_context = test_context - - @cluster(num_nodes=8) - @matrix(metadata_quorum=[quorum.combined_kraft], - group_protocol=["classic", "streams"]) - def test_rebalance_simple(self, metadata_quorum, group_protocol): - self.group_protocol = group_protocol - self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol)) - @cluster(num_nodes=8) - @matrix(metadata_quorum=[quorum.combined_kraft], - group_protocol=["classic", "streams"]) - def test_rebalance_complex(self, metadata_quorum, group_protocol): - self.group_protocol = group_protocol - self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol)) - - def run_rebalance(self, processor1, processor2, processor3, verifier): - """ - Starts and stops two test clients a few times. - Ensure that all records are delivered exactly-once. - """ - - self.driver.start() - - self.add_streams(processor1) - processor1.clean_node_enabled = False - self.add_streams2(processor1, processor2) - self.add_streams3(processor1, processor2, processor3) - self.stop_streams3(processor2, processor3, processor1) - self.add_streams3(processor2, processor3, processor1) - self.stop_streams3(processor1, processor3, processor2) - self.stop_streams2(processor1, processor3) - self.stop_streams(processor1) - processor1.clean_node_enabled = True - - self.driver.stop() - - verifier.start() - verifier.wait() - - verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) - - @cluster(num_nodes=8) - @matrix(metadata_quorum=[quorum.combined_kraft], - group_protocol=["classic", "streams"]) - def test_failure_and_recovery(self, metadata_quorum, group_protocol): - self.group_protocol = group_protocol - self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol)) - @cluster(num_nodes=8) - @matrix(metadata_quorum=[quorum.combined_kraft], - group_protocol=["classic", "streams"]) - def test_failure_and_recovery_complex(self, metadata_quorum, group_protocol): - self.group_protocol = group_protocol - self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol), - StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol)) - - def run_failure_and_recovery(self, processor1, processor2, processor3, verifier): - """ - Starts two test clients, then abort (kill -9) and restart them a few times. - Ensure that all records are delivered exactly-once. - """ - - self.driver.start() - - self.add_streams(processor1) - processor1.clean_node_enabled = False - self.add_streams2(processor1, processor2) - self.add_streams3(processor1, processor2, processor3) - self.abort_streams(processor2, processor3, processor1) - self.add_streams3(processor2, processor3, processor1) - self.abort_streams(processor2, processor3, processor1) - self.add_streams3(processor2, processor3, processor1) - self.abort_streams(processor1, processor3, processor2) - self.stop_streams2(processor1, processor3) - self.stop_streams(processor1) - processor1.clean_node_enabled = True - - self.driver.stop() - - verifier.start() - verifier.wait() - - verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False) - - def add_streams(self, processor): - with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: - processor.start() - self.wait_for_startup(monitor, processor) - - def add_streams2(self, running_processor, processor_to_be_started): - with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor: - self.add_streams(processor_to_be_started) - self.wait_for_startup(monitor, running_processor) - - def add_streams3(self, running_processor1, running_processor2, processor_to_be_started): - with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor: - self.add_streams2(running_processor2, processor_to_be_started) - self.wait_for_startup(monitor, running_processor1) - - def stop_streams(self, processor_to_be_stopped): - with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2: - processor_to_be_stopped.stop() - self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING") - - def stop_streams2(self, keep_alive_processor, processor_to_be_stopped): - with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor: - self.stop_streams(processor_to_be_stopped) - self.wait_for_startup(monitor, keep_alive_processor) - - def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped): - with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor: - self.stop_streams2(keep_alive_processor2, processor_to_be_stopped) - self.wait_for_startup(monitor, keep_alive_processor1) - - def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted): - with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1: - with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2: - processor_to_be_aborted.stop_nodes(False) - self.wait_for_startup(monitor2, keep_alive_processor2) - self.wait_for_startup(monitor1, keep_alive_processor1) - - def wait_for_startup(self, monitor, processor): - if self.group_protocol == "classic": - self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") - else: - # In the streams group protocol, not all members will take part in the rebalance. - # We can indirectly observe the progress of the group by seeing the member epoch being bumped. - self.wait_for(monitor, processor, "MemberEpochBump") - self.wait_for(monitor, processor, "processed [0-9]* records from topic") - - @staticmethod - def wait_for(monitor, processor, output): - monitor.wait_until(output, - timeout_sec=480, - err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))