From 78dd40123cc47e425264f516a4b8c6c2003c99b8 Mon Sep 17 00:00:00 2001 From: Alex Sorokoumov <918393+Gerrrr@users.noreply.github.com> Date: Sat, 14 May 2022 02:21:27 +0200 Subject: [PATCH] MINOR: Add upgrade tests for FK joins (#12122) Follow up PR for KAFKA-13769. Reviewers: Matthias J. Sax --- .../kafka/streams/tests/SmokeTestDriver.java | 106 +++++++++++----- .../kafka/streams/tests/SmokeTestDriver.java | 115 +++++++++++------ .../streams/tests/StreamsUpgradeTest.java | 45 ++++++- .../kafka/streams/tests/SmokeTestDriver.java | 114 ++++++++++++----- .../streams/tests/StreamsUpgradeTest.java | 45 ++++++- .../kafka/streams/tests/SmokeTestDriver.java | 116 ++++++++++++----- .../streams/tests/StreamsUpgradeTest.java | 45 ++++++- .../kafka/streams/tests/SmokeTestDriver.java | 101 +++++++++++---- .../streams/tests/StreamsUpgradeTest.java | 45 ++++++- .../kafka/streams/tests/SmokeTestDriver.java | 114 ++++++++++++----- .../streams/tests/StreamsUpgradeTest.java | 45 ++++++- .../kafka/streams/tests/SmokeTestDriver.java | 114 ++++++++++++----- .../streams/tests/StreamsUpgradeTest.java | 45 ++++++- .../kafka/streams/tests/SmokeTestDriver.java | 118 ++++++++++++------ .../streams/tests/StreamsUpgradeTest.java | 45 ++++++- tests/kafkatest/services/streams.py | 5 +- .../tests/streams/streams_upgrade_test.py | 77 +++++++----- tests/kafkatest/version.py | 3 + 18 files changed, 969 insertions(+), 329 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ac83cd95eba..2bbb25db395 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = { + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", "max", @@ -72,6 +73,15 @@ public class SmokeTestDriver extends SmokeTestUtil { "avg", "tagg" }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } private static final int MAX_RECORD_EMPTY_RETRIES = 30; @@ -163,7 +173,8 @@ public class SmokeTestDriver extends SmokeTestUtil { final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; - List> needRetry = new ArrayList<>(); + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (remaining > 0) { @@ -183,7 +194,16 @@ public class SmokeTestDriver extends SmokeTestUtil { intSerde.serializer().serialize("", value) ); - producer.send(record, new TestCallback(record, needRetry)); + producer.send(record, new TestCallback(record, dataNeedRetry)); + + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); numRecordsProduced++; allData.get(key).add(value); @@ -195,38 +215,62 @@ public class SmokeTestDriver extends SmokeTestUtil { } producer.flush(); - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List> needRetry2 = new ArrayList<>(); - for (final ProducerRecord record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); - } - producer.flush(); - needRetry = needRetry2; + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); - } - } + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); - // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out - // all suppressed records. - final List partitions = producer.partitionsFor("data"); - for (final PartitionInfo partition : partitions) { - producer.send(new ProducerRecord<>( - partition.topic(), - partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), - stringSerde.serializer().serialize("", "flush"), - intSerde.serializer().serialize("", 0) - )); - } } return Collections.unmodifiableMap(allData); } + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println("retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + private static Properties generatorProperties(final String kafka) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); @@ -315,14 +359,14 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, TOPICS); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(TOPICS) + Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); final Map>>> events = new HashMap<>(); diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ac83cd95eba..5c4a8cd615b 100644 --- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = { + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", "max", @@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil { "avg", "tagg" }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } private static final int MAX_RECORD_EMPTY_RETRIES = 30; @@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil { stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); - producer.send(record); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord); + numRecordsProduced++; if (numRecordsProduced % 100 == 0) { System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); @@ -148,7 +164,6 @@ public class SmokeTestDriver extends SmokeTestUtil { final Duration timeToSpend) { final Properties producerProps = generatorProperties(kafka); - int numRecordsProduced = 0; final Map> allData = new HashMap<>(); @@ -163,7 +178,8 @@ public class SmokeTestDriver extends SmokeTestUtil { final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; - List> needRetry = new ArrayList<>(); + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (remaining > 0) { @@ -175,15 +191,21 @@ public class SmokeTestDriver extends SmokeTestUtil { remaining--; data[index] = data[remaining]; } else { - final ProducerRecord record = new ProducerRecord<>( "data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); + producer.send(record, new TestCallback(record, dataNeedRetry)); - producer.send(record, new TestCallback(record, needRetry)); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); numRecordsProduced++; allData.get(key).add(value); @@ -195,38 +217,61 @@ public class SmokeTestDriver extends SmokeTestUtil { } producer.flush(); - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List> needRetry2 = new ArrayList<>(); - for (final ProducerRecord record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); - } - producer.flush(); - needRetry = needRetry2; + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); - } - } - - // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out - // all suppressed records. - final List partitions = producer.partitionsFor("data"); - for (final PartitionInfo partition : partitions) { - producer.send(new ProducerRecord<>( - partition.topic(), - partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), - stringSerde.serializer().serialize("", "flush"), - intSerde.serializer().serialize("", 0) - )); - } + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); } return Collections.unmodifiableMap(allData); } + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println( + "retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + private static Properties generatorProperties(final String kafka) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); @@ -315,14 +360,14 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, TOPICS); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(TOPICS) + Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); final Map>>> events = new HashMap<>(); diff --git a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index c0c8c72c599..9d08663d9b3 100644 --- a/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,11 +16,18 @@ */ package org.apache.kafka.streams.tests; +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + +import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -42,12 +49,29 @@ public class StreamsUpgradeTest { System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); - final KStream dataStream = builder.stream("data"); - dataStream.process(printProcessorSupplier()); + final KTable dataTable = builder.table( + "data", Consumed.with(stringSerde, intSerde)); + final KStream dataStream = dataTable.toStream(); + dataStream.process(printProcessorSupplier("data")); dataStream.to("echo"); + final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_fk_join", + "false")); + if (runFkJoin) { + try { + final KTable fkTable = builder.table( + "fk", Consumed.with(intSerde, stringSerde)); + buildFKTable(dataTable, fkTable); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } + final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty( + StreamsConfig.APPLICATION_ID_CONFIG, + "StreamsUpgradeTest-" + new Random().nextLong()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); @@ -61,13 +85,22 @@ public class StreamsUpgradeTest { })); } - private static ProcessorSupplier printProcessorSupplier() { + private static void buildFKTable(final KTable primaryTable, + final KTable otherTable) { + final KStream kStream = primaryTable + .join(otherTable, v -> v, (k0, v0) -> v0) + .toStream(); + kStream.process(printProcessorSupplier("fk")); + kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); + } + + private static ProcessorSupplier printProcessorSupplier(final String topic) { return () -> new AbstractProcessor() { private int numRecordsProcessed = 0; @Override public void init(final ProcessorContext context) { - System.out.println("[2.4] initializing processor: topic=data taskId=" + context.taskId()); + System.out.println("[2.4] initializing processor: topic=" + topic + " taskId=" + context.taskId()); numRecordsProcessed = 0; } @@ -75,7 +108,7 @@ public class StreamsUpgradeTest { public void process(final K key, final V value) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ac83cd95eba..4dae6eae575 100644 --- a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = { + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", "max", @@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil { "avg", "tagg" }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } private static final int MAX_RECORD_EMPTY_RETRIES = 30; @@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil { stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); - producer.send(record); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord); + numRecordsProduced++; if (numRecordsProduced % 100 == 0) { System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); @@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil { final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; - List> needRetry = new ArrayList<>(); + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (remaining > 0) { @@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil { remaining--; data[index] = data[remaining]; } else { - final ProducerRecord record = new ProducerRecord<>( "data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); + producer.send(record, new TestCallback(record, dataNeedRetry)); - producer.send(record, new TestCallback(record, needRetry)); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); numRecordsProduced++; allData.get(key).add(value); @@ -195,38 +218,61 @@ public class SmokeTestDriver extends SmokeTestUtil { } producer.flush(); - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List> needRetry2 = new ArrayList<>(); - for (final ProducerRecord record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); - } - producer.flush(); - needRetry = needRetry2; + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); - } - } - - // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out - // all suppressed records. - final List partitions = producer.partitionsFor("data"); - for (final PartitionInfo partition : partitions) { - producer.send(new ProducerRecord<>( - partition.topic(), - partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), - stringSerde.serializer().serialize("", "flush"), - intSerde.serializer().serialize("", 0) - )); - } + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); } return Collections.unmodifiableMap(allData); } + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println( + "retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + private static Properties generatorProperties(final String kafka) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); @@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, TOPICS); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(TOPICS) + Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); final Map>>> events = new HashMap<>(); diff --git a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 0fea040bcb4..69c46de37af 100644 --- a/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,11 +16,18 @@ */ package org.apache.kafka.streams.tests; +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + +import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -42,12 +49,29 @@ public class StreamsUpgradeTest { System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); - final KStream dataStream = builder.stream("data"); - dataStream.process(printProcessorSupplier()); + final KTable dataTable = builder.table( + "data", Consumed.with(stringSerde, intSerde)); + final KStream dataStream = dataTable.toStream(); + dataStream.process(printProcessorSupplier("data")); dataStream.to("echo"); + final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_fk_join", + "false")); + if (runFkJoin) { + try { + final KTable fkTable = builder.table( + "fk", Consumed.with(intSerde, stringSerde)); + buildFKTable(dataTable, fkTable); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } + final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty( + StreamsConfig.APPLICATION_ID_CONFIG, + "StreamsUpgradeTest-" + new Random().nextLong()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); @@ -61,13 +85,22 @@ public class StreamsUpgradeTest { })); } - private static ProcessorSupplier printProcessorSupplier() { + private static void buildFKTable(final KTable primaryTable, + final KTable otherTable) { + final KStream kStream = primaryTable + .join(otherTable, v -> v, (k0, v0) -> v0) + .toStream(); + kStream.process(printProcessorSupplier("fk")); + kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); + } + + private static ProcessorSupplier printProcessorSupplier(final String topic) { return () -> new AbstractProcessor() { private int numRecordsProcessed = 0; @Override public void init(final ProcessorContext context) { - System.out.println("[2.5] initializing processor: topic=data taskId=" + context.taskId()); + System.out.println("[2.5] initializing processor: topic=" + topic + " taskId=" + context.taskId()); numRecordsProcessed = 0; } @@ -75,7 +108,7 @@ public class StreamsUpgradeTest { public void process(final K key, final V value) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ac83cd95eba..0e08771495f 100644 --- a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = { + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", "max", @@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil { "avg", "tagg" }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } private static final int MAX_RECORD_EMPTY_RETRIES = 30; @@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil { stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); - producer.send(record); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord); + numRecordsProduced++; if (numRecordsProduced % 100 == 0) { System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); @@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil { final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; - List> needRetry = new ArrayList<>(); + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (remaining > 0) { @@ -175,7 +192,6 @@ public class SmokeTestDriver extends SmokeTestUtil { remaining--; data[index] = data[remaining]; } else { - final ProducerRecord record = new ProducerRecord<>( "data", @@ -183,7 +199,16 @@ public class SmokeTestDriver extends SmokeTestUtil { intSerde.serializer().serialize("", value) ); - producer.send(record, new TestCallback(record, needRetry)); + producer.send(record, new TestCallback(record, dataNeedRetry)); + + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); numRecordsProduced++; allData.get(key).add(value); @@ -195,38 +220,61 @@ public class SmokeTestDriver extends SmokeTestUtil { } producer.flush(); - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List> needRetry2 = new ArrayList<>(); - for (final ProducerRecord record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); - } - producer.flush(); - needRetry = needRetry2; + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); - } - } - - // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out - // all suppressed records. - final List partitions = producer.partitionsFor("data"); - for (final PartitionInfo partition : partitions) { - producer.send(new ProducerRecord<>( - partition.topic(), - partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), - stringSerde.serializer().serialize("", "flush"), - intSerde.serializer().serialize("", 0) - )); - } + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); } return Collections.unmodifiableMap(allData); } + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println( + "retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + private static Properties generatorProperties(final String kafka) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); @@ -315,14 +363,14 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, TOPICS); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(TOPICS) + Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); final Map>>> events = new HashMap<>(); diff --git a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index e1b294ff15b..0844552134a 100644 --- a/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-26/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,11 +16,18 @@ */ package org.apache.kafka.streams.tests; +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + +import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -42,12 +49,29 @@ public class StreamsUpgradeTest { System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); - final KStream dataStream = builder.stream("data"); - dataStream.process(printProcessorSupplier()); + final KTable dataTable = builder.table( + "data", Consumed.with(stringSerde, intSerde)); + final KStream dataStream = dataTable.toStream(); + dataStream.process(printProcessorSupplier("data")); dataStream.to("echo"); + final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_fk_join", + "false")); + if (runFkJoin) { + try { + final KTable fkTable = builder.table( + "fk", Consumed.with(intSerde, stringSerde)); + buildFKTable(dataTable, fkTable); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } + final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty( + StreamsConfig.APPLICATION_ID_CONFIG, + "StreamsUpgradeTest-" + new Random().nextLong()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); @@ -61,13 +85,22 @@ public class StreamsUpgradeTest { })); } - private static ProcessorSupplier printProcessorSupplier() { + private static void buildFKTable(final KTable primaryTable, + final KTable otherTable) { + final KStream kStream = primaryTable + .join(otherTable, v -> v, (k0, v0) -> v0) + .toStream(); + kStream.process(printProcessorSupplier("fk")); + kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); + } + + private static ProcessorSupplier printProcessorSupplier(final String topic) { return () -> new AbstractProcessor() { private int numRecordsProcessed = 0; @Override public void init(final ProcessorContext context) { - System.out.println("[2.6] initializing processor: topic=data taskId=" + context.taskId()); + System.out.println("[2.6] initializing processor: topic=" + topic + " taskId=" + context.taskId()); numRecordsProcessed = 0; } @@ -75,7 +108,7 @@ public class StreamsUpgradeTest { public void process(final K key, final V value) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ac83cd95eba..ac7482cfb2d 100644 --- a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = { + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", "max", @@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil { "avg", "tagg" }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } private static final int MAX_RECORD_EMPTY_RETRIES = 30; @@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil { stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); - producer.send(record); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord); + numRecordsProduced++; if (numRecordsProduced % 100 == 0) { System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); @@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil { final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; - List> needRetry = new ArrayList<>(); + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (remaining > 0) { @@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil { remaining--; data[index] = data[remaining]; } else { - final ProducerRecord record = new ProducerRecord<>( "data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); + producer.send(record, new TestCallback(record, dataNeedRetry)); - producer.send(record, new TestCallback(record, needRetry)); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); numRecordsProduced++; allData.get(key).add(value); @@ -195,21 +218,19 @@ public class SmokeTestDriver extends SmokeTestUtil { } producer.flush(); - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List> needRetry2 = new ArrayList<>(); - for (final ProducerRecord record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); - } - producer.flush(); - needRetry = needRetry2; + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); - } - } + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out // all suppressed records. @@ -227,6 +248,44 @@ public class SmokeTestDriver extends SmokeTestUtil { return Collections.unmodifiableMap(allData); } + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println( + "retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + private static Properties generatorProperties(final String kafka) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); @@ -315,14 +374,14 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, TOPICS); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(TOPICS) + Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); final Map>>> events = new HashMap<>(); diff --git a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 6f485e694cf..32d8d9408f5 100644 --- a/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-27/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,11 +16,18 @@ */ package org.apache.kafka.streams.tests; +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + +import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -42,12 +49,29 @@ public class StreamsUpgradeTest { System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); - final KStream dataStream = builder.stream("data"); - dataStream.process(printProcessorSupplier()); + final KTable dataTable = builder.table( + "data", Consumed.with(stringSerde, intSerde)); + final KStream dataStream = dataTable.toStream(); + dataStream.process(printProcessorSupplier("data")); dataStream.to("echo"); + final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_fk_join", + "false")); + if (runFkJoin) { + try { + final KTable fkTable = builder.table( + "fk", Consumed.with(intSerde, stringSerde)); + buildFKTable(dataTable, fkTable); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } + final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty( + StreamsConfig.APPLICATION_ID_CONFIG, + "StreamsUpgradeTest-" + new Random().nextLong()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.putAll(streamsProperties); @@ -61,13 +85,22 @@ public class StreamsUpgradeTest { })); } - private static ProcessorSupplier printProcessorSupplier() { + private static void buildFKTable(final KTable primaryTable, + final KTable otherTable) { + final KStream kStream = primaryTable + .join(otherTable, v -> v, (k0, v0) -> v0) + .toStream(); + kStream.process(printProcessorSupplier("fk")); + kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); + } + + private static ProcessorSupplier printProcessorSupplier(final String topic) { return () -> new AbstractProcessor() { private int numRecordsProcessed = 0; @Override public void init(final ProcessorContext context) { - System.out.println("[2.7] initializing processor: topic=data taskId=" + context.taskId()); + System.out.println("[2.7] initializing processor: topic=" + topic + " taskId=" + context.taskId()); numRecordsProcessed = 0; } @@ -75,7 +108,7 @@ public class StreamsUpgradeTest { public void process(final K key, final V value) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ac83cd95eba..4dae6eae575 100644 --- a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = { + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", "max", @@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil { "avg", "tagg" }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } private static final int MAX_RECORD_EMPTY_RETRIES = 30; @@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil { stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); - producer.send(record); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord); + numRecordsProduced++; if (numRecordsProduced % 100 == 0) { System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); @@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil { final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; - List> needRetry = new ArrayList<>(); + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (remaining > 0) { @@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil { remaining--; data[index] = data[remaining]; } else { - final ProducerRecord record = new ProducerRecord<>( "data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); + producer.send(record, new TestCallback(record, dataNeedRetry)); - producer.send(record, new TestCallback(record, needRetry)); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); numRecordsProduced++; allData.get(key).add(value); @@ -195,38 +218,61 @@ public class SmokeTestDriver extends SmokeTestUtil { } producer.flush(); - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List> needRetry2 = new ArrayList<>(); - for (final ProducerRecord record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); - } - producer.flush(); - needRetry = needRetry2; + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); - } - } - - // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out - // all suppressed records. - final List partitions = producer.partitionsFor("data"); - for (final PartitionInfo partition : partitions) { - producer.send(new ProducerRecord<>( - partition.topic(), - partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), - stringSerde.serializer().serialize("", "flush"), - intSerde.serializer().serialize("", 0) - )); - } + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); } return Collections.unmodifiableMap(allData); } + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println( + "retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + private static Properties generatorProperties(final String kafka) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); @@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, TOPICS); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(TOPICS) + Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); final Map>>> events = new HashMap<>(); diff --git a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 4f2825d23d6..db17d73bcba 100644 --- a/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,11 +16,18 @@ */ package org.apache.kafka.streams.tests; +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + +import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -42,12 +49,29 @@ public class StreamsUpgradeTest { System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); - final KStream dataStream = builder.stream("data"); - dataStream.process(printProcessorSupplier()); + final KTable dataTable = builder.table( + "data", Consumed.with(stringSerde, intSerde)); + final KStream dataStream = dataTable.toStream(); + dataStream.process(printProcessorSupplier("data")); dataStream.to("echo"); + final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_fk_join", + "false")); + if (runFkJoin) { + try { + final KTable fkTable = builder.table( + "fk", Consumed.with(intSerde, stringSerde)); + buildFKTable(dataTable, fkTable); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } + final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty( + StreamsConfig.APPLICATION_ID_CONFIG, + "StreamsUpgradeTest-" + new Random().nextLong()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); @@ -61,13 +85,22 @@ public class StreamsUpgradeTest { })); } - private static ProcessorSupplier printProcessorSupplier() { + private static void buildFKTable(final KTable primaryTable, + final KTable otherTable) { + final KStream kStream = primaryTable + .join(otherTable, v -> v, (k0, v0) -> v0) + .toStream(); + kStream.process(printProcessorSupplier("fk")); + kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); + } + + private static ProcessorSupplier printProcessorSupplier(final String topic) { return () -> new AbstractProcessor() { private int numRecordsProcessed = 0; @Override public void init(final ProcessorContext context) { - System.out.println("[2.8] initializing processor: topic=data taskId=" + context.taskId()); + System.out.println("[2.8] initializing processor: topic=" + topic + " taskId=" + context.taskId()); numRecordsProcessed = 0; } @@ -75,7 +108,7 @@ public class StreamsUpgradeTest { public void process(final K key, final V value) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ac83cd95eba..4dae6eae575 100644 --- a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = { + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", "max", @@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil { "avg", "tagg" }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } private static final int MAX_RECORD_EMPTY_RETRIES = 30; @@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil { stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); - producer.send(record); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord); + numRecordsProduced++; if (numRecordsProduced % 100 == 0) { System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); @@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil { final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; - List> needRetry = new ArrayList<>(); + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (remaining > 0) { @@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil { remaining--; data[index] = data[remaining]; } else { - final ProducerRecord record = new ProducerRecord<>( "data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); + producer.send(record, new TestCallback(record, dataNeedRetry)); - producer.send(record, new TestCallback(record, needRetry)); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); numRecordsProduced++; allData.get(key).add(value); @@ -195,38 +218,61 @@ public class SmokeTestDriver extends SmokeTestUtil { } producer.flush(); - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List> needRetry2 = new ArrayList<>(); - for (final ProducerRecord record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); - } - producer.flush(); - needRetry = needRetry2; + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); - } - } - - // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out - // all suppressed records. - final List partitions = producer.partitionsFor("data"); - for (final PartitionInfo partition : partitions) { - producer.send(new ProducerRecord<>( - partition.topic(), - partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), - stringSerde.serializer().serialize("", "flush"), - intSerde.serializer().serialize("", 0) - )); - } + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); } return Collections.unmodifiableMap(allData); } + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println( + "retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + private static Properties generatorProperties(final String kafka) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); @@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, TOPICS); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(TOPICS) + Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); final Map>>> events = new HashMap<>(); diff --git a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index b097de71d41..0751516d76c 100644 --- a/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-30/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,11 +16,18 @@ */ package org.apache.kafka.streams.tests; +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + +import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; @@ -44,12 +51,29 @@ public class StreamsUpgradeTest { System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); - final KStream dataStream = builder.stream("data"); - dataStream.process(printProcessorSupplier()); + final KTable dataTable = builder.table( + "data", Consumed.with(stringSerde, intSerde)); + final KStream dataStream = dataTable.toStream(); + dataStream.process(printProcessorSupplier("data")); dataStream.to("echo"); + final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_fk_join", + "false")); + if (runFkJoin) { + try { + final KTable fkTable = builder.table( + "fk", Consumed.with(intSerde, stringSerde)); + buildFKTable(dataTable, fkTable); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } + final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty( + StreamsConfig.APPLICATION_ID_CONFIG, + "StreamsUpgradeTest-" + new Random().nextLong()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); @@ -63,13 +87,22 @@ public class StreamsUpgradeTest { })); } - private static ProcessorSupplier printProcessorSupplier() { + private static void buildFKTable(final KTable primaryTable, + final KTable otherTable) { + final KStream kStream = primaryTable + .join(otherTable, v -> v, (k0, v0) -> v0) + .toStream(); + kStream.process(printProcessorSupplier("fk")); + kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); + } + + private static ProcessorSupplier printProcessorSupplier(final String topic) { return () -> new ContextualProcessor() { private int numRecordsProcessed = 0; @Override public void init(final ProcessorContext context) { - System.out.println("[3.0] initializing processor: topic=data taskId=" + context.taskId()); + System.out.println("[3.0] initializing processor: topic=" + topic + " taskId=" + context.taskId()); numRecordsProcessed = 0; } @@ -77,7 +110,7 @@ public class StreamsUpgradeTest { public void process(final Record record) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index ac83cd95eba..dbacbb9625b 100644 --- a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; @@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; public class SmokeTestDriver extends SmokeTestUtil { - private static final String[] TOPICS = { + private static final String[] NUMERIC_VALUE_TOPICS = { "data", "echo", "max", @@ -72,6 +73,15 @@ public class SmokeTestDriver extends SmokeTestUtil { "avg", "tagg" }; + private static final String[] STRING_VALUE_TOPICS = { + "fk" + }; + + private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length]; + static { + System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length); + System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length); + } private static final int MAX_RECORD_EMPTY_RETRIES = 30; @@ -130,9 +140,16 @@ public class SmokeTestDriver extends SmokeTestUtil { stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value) ); - producer.send(record); + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + producer.send(fkRecord); + numRecordsProduced++; if (numRecordsProduced % 100 == 0) { System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); @@ -148,7 +165,6 @@ public class SmokeTestDriver extends SmokeTestUtil { final Duration timeToSpend) { final Properties producerProps = generatorProperties(kafka); - int numRecordsProduced = 0; final Map> allData = new HashMap<>(); @@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil { final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; - List> needRetry = new ArrayList<>(); + final List> dataNeedRetry = new ArrayList<>(); + final List> fkNeedRetry = new ArrayList<>(); try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) { while (remaining > 0) { @@ -175,7 +192,6 @@ public class SmokeTestDriver extends SmokeTestUtil { remaining--; data[index] = data[remaining]; } else { - final ProducerRecord record = new ProducerRecord<>( "data", @@ -183,7 +199,16 @@ public class SmokeTestDriver extends SmokeTestUtil { intSerde.serializer().serialize("", value) ); - producer.send(record, new TestCallback(record, needRetry)); + producer.send(record, new TestCallback(record, dataNeedRetry)); + + final ProducerRecord fkRecord = + new ProducerRecord<>( + "fk", + intSerde.serializer().serialize("", value), + stringSerde.serializer().serialize("", key) + ); + + producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry)); numRecordsProduced++; allData.get(key).add(value); @@ -195,38 +220,61 @@ public class SmokeTestDriver extends SmokeTestUtil { } producer.flush(); - int remainingRetries = 5; - while (!needRetry.isEmpty()) { - final List> needRetry2 = new ArrayList<>(); - for (final ProducerRecord record : needRetry) { - System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key())); - producer.send(record, new TestCallback(record, needRetry2)); - } - producer.flush(); - needRetry = needRetry2; + retry(producer, dataNeedRetry, stringSerde); + retry(producer, fkNeedRetry, intSerde); - if (--remainingRetries == 0 && !needRetry.isEmpty()) { - System.err.println("Failed to produce all records after multiple retries"); - Exit.exit(1); - } - } - - // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out - // all suppressed records. - final List partitions = producer.partitionsFor("data"); - for (final PartitionInfo partition : partitions) { - producer.send(new ProducerRecord<>( - partition.topic(), - partition.partition(), - System.currentTimeMillis() + Duration.ofDays(2).toMillis(), - stringSerde.serializer().serialize("", "flush"), - intSerde.serializer().serialize("", 0) - )); - } + flush(producer, + "data", + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + ); + flush(producer, + "fk", + intSerde.serializer().serialize("", 0), + stringSerde.serializer().serialize("", "flush") + ); } return Collections.unmodifiableMap(allData); } + private static void retry(final KafkaProducer producer, + List> needRetry, + final Serde keySerde) { + int remainingRetries = 5; + while (!needRetry.isEmpty()) { + final List> needRetry2 = new ArrayList<>(); + for (final ProducerRecord record : needRetry) { + System.out.println( + "retry producing " + keySerde.deserializer().deserialize("", record.key())); + producer.send(record, new TestCallback(record, needRetry2)); + } + producer.flush(); + needRetry = needRetry2; + if (--remainingRetries == 0 && !needRetry.isEmpty()) { + System.err.println("Failed to produce all records after multiple retries"); + Exit.exit(1); + } + } + } + + private static void flush(final KafkaProducer producer, + final String topic, + final byte[] keyBytes, + final byte[] valBytes) { + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor(topic); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + keyBytes, + valBytes + )); + } + } + private static Properties generatorProperties(final String kafka) { final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); @@ -315,14 +363,14 @@ public class SmokeTestDriver extends SmokeTestUtil { props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, TOPICS); + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); final int recordsGenerated = inputs.size() * maxRecordsPerKey; int recordsProcessed = 0; final Map processed = - Stream.of(TOPICS) + Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); final Map>>> events = new HashMap<>(); diff --git a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 6657c5f2f23..311d30ba400 100644 --- a/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/upgrade-system-tests-31/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -16,11 +16,18 @@ */ package org.apache.kafka.streams.tests; +import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde; +import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde; + +import java.util.Random; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; @@ -44,12 +51,29 @@ public class StreamsUpgradeTest { System.out.println("props=" + streamsProperties); final StreamsBuilder builder = new StreamsBuilder(); - final KStream dataStream = builder.stream("data"); - dataStream.process(printProcessorSupplier()); + final KTable dataTable = builder.table( + "data", Consumed.with(stringSerde, intSerde)); + final KStream dataStream = dataTable.toStream(); + dataStream.process(printProcessorSupplier("data")); dataStream.to("echo"); + final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( + "test.run_fk_join", + "false")); + if (runFkJoin) { + try { + final KTable fkTable = builder.table( + "fk", Consumed.with(intSerde, stringSerde)); + buildFKTable(dataStream, fkTable); + } catch (final Exception e) { + System.err.println("Caught " + e.getMessage()); + } + } + final Properties config = new Properties(); - config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); + config.setProperty( + StreamsConfig.APPLICATION_ID_CONFIG, + "StreamsUpgradeTest-" + new Random().nextLong()); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.putAll(streamsProperties); @@ -63,13 +87,22 @@ public class StreamsUpgradeTest { })); } - private static ProcessorSupplier printProcessorSupplier() { + private static void buildFKTable(final KStream primaryTable, + final KTable otherTable) { + final KStream kStream = primaryTable.toTable() + .join(otherTable, v -> v, (k0, v0) -> v0) + .toStream(); + kStream.process(printProcessorSupplier("fk")); + kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); + } + + private static ProcessorSupplier printProcessorSupplier(final String topic) { return () -> new ContextualProcessor() { private int numRecordsProcessed = 0; @Override public void init(final ProcessorContext context) { - System.out.println("[3.1] initializing processor: topic=data taskId=" + context.taskId()); + System.out.println("[3.1] initializing processor: topic=" + topic + "taskId=" + context.taskId()); numRecordsProcessed = 0; } @@ -77,7 +110,7 @@ public class StreamsUpgradeTest { public void process(final Record record) { numRecordsProcessed++; if (numRecordsProcessed % 100 == 0) { - System.out.println("processed " + numRecordsProcessed + " records from topic=data"); + System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic); } } diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 5dedc579163..38b303281d2 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -22,7 +22,7 @@ from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.kafka import KafkaConfig from kafkatest.services.monitor.jmx import JmxMixin -from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1 +from kafkatest.version import KafkaVersion, LATEST_0_10_0, LATEST_0_10_1 STATE_DIR = "state.dir" @@ -616,6 +616,9 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService): if self.UPGRADE_FROM is not None: properties['upgrade.from'] = self.UPGRADE_FROM + if (self.UPGRADE_FROM is not None and KafkaVersion(self.UPGRADE_FROM).supports_fk_joins()) or \ + (self.KAFKA_STREAMS_VERSION is not None and KafkaVersion(self.KAFKA_STREAMS_VERSION).supports_fk_joins()): + properties['test.run_fk_join'] = "true" if self.UPGRADE_TO == "future_version": properties['test.future.metadata'] = "any_value" diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 57c89aa4c83..7639b33f1ed 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -37,6 +37,8 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] +fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), + str(LATEST_3_0), str(LATEST_3_1)] """ After each release one should first check that the released version has been uploaded to @@ -86,9 +88,11 @@ class StreamsUpgradeTest(Test): self.topics = { 'echo' : { 'partitions': 5 }, 'data' : { 'partitions': 5 }, + 'fk' : { 'partitions': 5 }, } - processed_msg = "processed [0-9]* records" + processed_data_msg = "processed [0-9]* records from topic=data" + processed_fk_msg = "processed [0-9]* records from topic=fk" base_version_number = str(DEV_VERSION).split("-")[0] def perform_broker_upgrade(self, to_version): @@ -159,9 +163,9 @@ class StreamsUpgradeTest(Test): with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: processor.start() - monitor.wait_until(self.processed_msg, + monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(processor.node)) + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node)) connected_message = "Discovered group coordinator" with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor: @@ -172,9 +176,9 @@ class StreamsUpgradeTest(Test): timeout_sec=120, err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account)) - stdout_monitor.wait_until(self.processed_msg, + stdout_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node.account)) + err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account)) # SmokeTestDriver allows up to 6 minutes to consume all # records for the verification step so this timeout is set to @@ -192,8 +196,12 @@ class StreamsUpgradeTest(Test): @cluster(num_nodes=6) @matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)]) @matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)]) - def test_metadata_upgrade(self, from_version, to_version): + @matrix(from_version=fk_join_versions, to_version=[str(DEV_VERSION)]) + def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): """ + This test verifies that the cluster successfully upgrades despite changes in the metadata and FK + join protocols. + Starts 3 KafkaStreams instances with version and upgrades one-by-one to """ @@ -311,9 +319,14 @@ class StreamsUpgradeTest(Test): log_monitor.wait_until(kafka_version_str, timeout_sec=60, err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) - monitor.wait_until(self.processed_msg, + monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account)) + if KafkaVersion(version).supports_fk_joins(): + monitor.wait_until(self.processed_fk_msg, + timeout_sec=60, + err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node1.account)) + # start second with self.prepare_for(self.processor2, version) @@ -325,12 +338,16 @@ class StreamsUpgradeTest(Test): log_monitor.wait_until(kafka_version_str, timeout_sec=60, err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account)) - first_monitor.wait_until(self.processed_msg, + first_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - second_monitor.wait_until(self.processed_msg, + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account)) + second_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node2.account)) + if KafkaVersion(version).supports_fk_joins(): + second_monitor.wait_until(self.processed_fk_msg, + timeout_sec=60, + err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node2.account)) # start third with self.prepare_for(self.processor3, version) @@ -343,15 +360,19 @@ class StreamsUpgradeTest(Test): log_monitor.wait_until(kafka_version_str, timeout_sec=60, err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account)) - first_monitor.wait_until(self.processed_msg, + first_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - second_monitor.wait_until(self.processed_msg, + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account)) + second_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) - third_monitor.wait_until(self.processed_msg, + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node2.account)) + third_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account)) + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node3.account)) + if KafkaVersion(version).supports_fk_joins(): + third_monitor.wait_until(self.processed_fk_msg, + timeout_sec=60, + err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node2.account)) @staticmethod def prepare_for(processor, version): @@ -381,12 +402,12 @@ class StreamsUpgradeTest(Test): with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor: with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor: processor.stop() - first_other_monitor.wait_until(self.processed_msg, + first_other_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account)) - second_other_monitor.wait_until(self.processed_msg, + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(first_other_node.account)) + second_other_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account)) + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(second_other_node.account)) node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) if upgrade_from is None: # upgrade disabled -- second round of rolling bounces @@ -414,23 +435,23 @@ class StreamsUpgradeTest(Test): log_monitor.wait_until(kafka_version_str, timeout_sec=60, err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account)) - first_other_monitor.wait_until(self.processed_msg, + first_other_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account)) + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(first_other_node.account)) found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True)) if len(found) > 0: raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'") - second_other_monitor.wait_until(self.processed_msg, + second_other_monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account)) + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(second_other_node.account)) found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True)) if len(found) > 0: raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'") - monitor.wait_until(self.processed_msg, + monitor.wait_until(self.processed_data_msg, timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account)) + err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node.account)) def do_rolling_bounce(self, processor, counter, current_generation): diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index 3a01a5adf58..64f0bf2c53d 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -106,6 +106,9 @@ class KafkaVersion(LooseVersion): # Self-managed clusters always support topic ID, so this method only applies to ZK clusters. return self >= V_2_8_0 + def supports_fk_joins(self): + return hasattr(self, "version") and self >= V_2_4_0 + def get_version(node=None): """Return the version attached to the given node. Default to DEV_BRANCH if node or node.version is undefined (aka None)