MINOR: Add upgrade tests for FK joins (#12122)

Follow up PR for KAFKA-13769.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
Alex Sorokoumov 2022-05-14 02:21:27 +02:00 committed by GitHub
parent f96e381387
commit 78dd40123c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 969 additions and 329 deletions

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = { private static final String[] NUMERIC_VALUE_TOPICS = {
"data", "data",
"echo", "echo",
"max", "max",
@ -72,6 +73,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg", "avg",
"tagg" "tagg"
}; };
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@ -163,7 +173,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) { while (remaining > 0) {
@ -183,7 +194,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record, new TestCallback(record, needRetry)); producer.send(record, new TestCallback(record, dataNeedRetry));
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
@ -195,38 +215,62 @@ public class SmokeTestDriver extends SmokeTestUtil {
} }
producer.flush(); producer.flush();
int remainingRetries = 5; retry(producer, dataNeedRetry, stringSerde);
while (!needRetry.isEmpty()) { retry(producer, fkNeedRetry, intSerde);
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) { flush(producer,
System.err.println("Failed to produce all records after multiple retries"); "data",
Exit.exit(1); stringSerde.serializer().serialize("", "flush"),
} intSerde.serializer().serialize("", 0)
} );
flush(producer,
"fk",
intSerde.serializer().serialize("", 0),
stringSerde.serializer().serialize("", "flush")
);
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor("data");
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
));
}
} }
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) { private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties(); final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@ -315,14 +359,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions); consumer.assign(partitions);
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey; final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0; int recordsProcessed = 0;
final Map<String, AtomicInteger> processed = final Map<String, AtomicInteger> processed =
Stream.of(TOPICS) Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = { private static final String[] NUMERIC_VALUE_TOPICS = {
"data", "data",
"echo", "echo",
"max", "max",
@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg", "avg",
"tagg" "tagg"
}; };
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record); producer.send(record);
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord);
numRecordsProduced++; numRecordsProduced++;
if (numRecordsProduced % 100 == 0) { if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@ -148,7 +164,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Duration timeToSpend) { final Duration timeToSpend) {
final Properties producerProps = generatorProperties(kafka); final Properties producerProps = generatorProperties(kafka);
int numRecordsProduced = 0; int numRecordsProduced = 0;
final Map<String, Set<Integer>> allData = new HashMap<>(); final Map<String, Set<Integer>> allData = new HashMap<>();
@ -163,7 +178,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) { while (remaining > 0) {
@ -175,15 +191,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--; remaining--;
data[index] = data[remaining]; data[index] = data[remaining];
} else { } else {
final ProducerRecord<byte[], byte[]> record = final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>( new ProducerRecord<>(
"data", "data",
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record, new TestCallback(record, dataNeedRetry));
producer.send(record, new TestCallback(record, needRetry)); final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
@ -195,38 +217,61 @@ public class SmokeTestDriver extends SmokeTestUtil {
} }
producer.flush(); producer.flush();
int remainingRetries = 5; retry(producer, dataNeedRetry, stringSerde);
while (!needRetry.isEmpty()) { retry(producer, fkNeedRetry, intSerde);
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) { flush(producer,
System.err.println("Failed to produce all records after multiple retries"); "data",
Exit.exit(1); stringSerde.serializer().serialize("", "flush"),
} intSerde.serializer().serialize("", 0)
} );
flush(producer,
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out "fk",
// all suppressed records. intSerde.serializer().serialize("", 0),
final List<PartitionInfo> partitions = producer.partitionsFor("data"); stringSerde.serializer().serialize("", "flush")
for (final PartitionInfo partition : partitions) { );
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
));
}
} }
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println(
"retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) { private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties(); final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@ -315,14 +360,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions); consumer.assign(partitions);
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey; final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0; int recordsProcessed = 0;
final Map<String, AtomicInteger> processed = final Map<String, AtomicInteger> processed =
Stream.of(TOPICS) Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.kafka.streams.tests; package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties); System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data"); final KTable<String, Integer> dataTable = builder.table(
dataStream.process(printProcessorSupplier()); "data", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream();
dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo"); dataStream.to("echo");
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
"fk", Consumed.with(intSerde, stringSerde));
buildFKTable(dataTable, fkTable);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
})); }));
} }
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { private static void buildFKTable(final KTable<String, Integer> primaryTable,
final KTable<Integer, String> otherTable) {
final KStream<String, String> kStream = primaryTable
.join(otherTable, v -> v, (k0, v0) -> v0)
.toStream();
kStream.process(printProcessorSupplier("fk"));
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() { return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0; private int numRecordsProcessed = 0;
@Override @Override
public void init(final ProcessorContext context) { public void init(final ProcessorContext context) {
System.out.println("[2.4] initializing processor: topic=data taskId=" + context.taskId()); System.out.println("[2.4] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0; numRecordsProcessed = 0;
} }
@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) { public void process(final K key, final V value) {
numRecordsProcessed++; numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) { if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = { private static final String[] NUMERIC_VALUE_TOPICS = {
"data", "data",
"echo", "echo",
"max", "max",
@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg", "avg",
"tagg" "tagg"
}; };
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record); producer.send(record);
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord);
numRecordsProduced++; numRecordsProduced++;
if (numRecordsProduced % 100 == 0) { if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) { while (remaining > 0) {
@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--; remaining--;
data[index] = data[remaining]; data[index] = data[remaining];
} else { } else {
final ProducerRecord<byte[], byte[]> record = final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>( new ProducerRecord<>(
"data", "data",
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record, new TestCallback(record, dataNeedRetry));
producer.send(record, new TestCallback(record, needRetry)); final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
@ -195,38 +218,61 @@ public class SmokeTestDriver extends SmokeTestUtil {
} }
producer.flush(); producer.flush();
int remainingRetries = 5; retry(producer, dataNeedRetry, stringSerde);
while (!needRetry.isEmpty()) { retry(producer, fkNeedRetry, intSerde);
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) { flush(producer,
System.err.println("Failed to produce all records after multiple retries"); "data",
Exit.exit(1); stringSerde.serializer().serialize("", "flush"),
} intSerde.serializer().serialize("", 0)
} );
flush(producer,
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out "fk",
// all suppressed records. intSerde.serializer().serialize("", 0),
final List<PartitionInfo> partitions = producer.partitionsFor("data"); stringSerde.serializer().serialize("", "flush")
for (final PartitionInfo partition : partitions) { );
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
));
}
} }
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println(
"retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) { private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties(); final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions); consumer.assign(partitions);
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey; final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0; int recordsProcessed = 0;
final Map<String, AtomicInteger> processed = final Map<String, AtomicInteger> processed =
Stream.of(TOPICS) Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.kafka.streams.tests; package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties); System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data"); final KTable<String, Integer> dataTable = builder.table(
dataStream.process(printProcessorSupplier()); "data", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream();
dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo"); dataStream.to("echo");
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
"fk", Consumed.with(intSerde, stringSerde));
buildFKTable(dataTable, fkTable);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
})); }));
} }
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { private static void buildFKTable(final KTable<String, Integer> primaryTable,
final KTable<Integer, String> otherTable) {
final KStream<String, String> kStream = primaryTable
.join(otherTable, v -> v, (k0, v0) -> v0)
.toStream();
kStream.process(printProcessorSupplier("fk"));
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() { return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0; private int numRecordsProcessed = 0;
@Override @Override
public void init(final ProcessorContext context) { public void init(final ProcessorContext context) {
System.out.println("[2.5] initializing processor: topic=data taskId=" + context.taskId()); System.out.println("[2.5] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0; numRecordsProcessed = 0;
} }
@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) { public void process(final K key, final V value) {
numRecordsProcessed++; numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) { if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = { private static final String[] NUMERIC_VALUE_TOPICS = {
"data", "data",
"echo", "echo",
"max", "max",
@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg", "avg",
"tagg" "tagg"
}; };
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record); producer.send(record);
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord);
numRecordsProduced++; numRecordsProduced++;
if (numRecordsProduced % 100 == 0) { if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) { while (remaining > 0) {
@ -175,7 +192,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--; remaining--;
data[index] = data[remaining]; data[index] = data[remaining];
} else { } else {
final ProducerRecord<byte[], byte[]> record = final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>( new ProducerRecord<>(
"data", "data",
@ -183,7 +199,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record, new TestCallback(record, needRetry)); producer.send(record, new TestCallback(record, dataNeedRetry));
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
@ -195,38 +220,61 @@ public class SmokeTestDriver extends SmokeTestUtil {
} }
producer.flush(); producer.flush();
int remainingRetries = 5; retry(producer, dataNeedRetry, stringSerde);
while (!needRetry.isEmpty()) { retry(producer, fkNeedRetry, intSerde);
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) { flush(producer,
System.err.println("Failed to produce all records after multiple retries"); "data",
Exit.exit(1); stringSerde.serializer().serialize("", "flush"),
} intSerde.serializer().serialize("", 0)
} );
flush(producer,
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out "fk",
// all suppressed records. intSerde.serializer().serialize("", 0),
final List<PartitionInfo> partitions = producer.partitionsFor("data"); stringSerde.serializer().serialize("", "flush")
for (final PartitionInfo partition : partitions) { );
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
));
}
} }
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println(
"retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) { private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties(); final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@ -315,14 +363,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions); consumer.assign(partitions);
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey; final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0; int recordsProcessed = 0;
final Map<String, AtomicInteger> processed = final Map<String, AtomicInteger> processed =
Stream.of(TOPICS) Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.kafka.streams.tests; package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties); System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data"); final KTable<String, Integer> dataTable = builder.table(
dataStream.process(printProcessorSupplier()); "data", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream();
dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo"); dataStream.to("echo");
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
"fk", Consumed.with(intSerde, stringSerde));
buildFKTable(dataTable, fkTable);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
})); }));
} }
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { private static void buildFKTable(final KTable<String, Integer> primaryTable,
final KTable<Integer, String> otherTable) {
final KStream<String, String> kStream = primaryTable
.join(otherTable, v -> v, (k0, v0) -> v0)
.toStream();
kStream.process(printProcessorSupplier("fk"));
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() { return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0; private int numRecordsProcessed = 0;
@Override @Override
public void init(final ProcessorContext context) { public void init(final ProcessorContext context) {
System.out.println("[2.6] initializing processor: topic=data taskId=" + context.taskId()); System.out.println("[2.6] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0; numRecordsProcessed = 0;
} }
@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) { public void process(final K key, final V value) {
numRecordsProcessed++; numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) { if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = { private static final String[] NUMERIC_VALUE_TOPICS = {
"data", "data",
"echo", "echo",
"max", "max",
@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg", "avg",
"tagg" "tagg"
}; };
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record); producer.send(record);
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord);
numRecordsProduced++; numRecordsProduced++;
if (numRecordsProduced % 100 == 0) { if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) { while (remaining > 0) {
@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--; remaining--;
data[index] = data[remaining]; data[index] = data[remaining];
} else { } else {
final ProducerRecord<byte[], byte[]> record = final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>( new ProducerRecord<>(
"data", "data",
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record, new TestCallback(record, dataNeedRetry));
producer.send(record, new TestCallback(record, needRetry)); final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
@ -195,21 +218,19 @@ public class SmokeTestDriver extends SmokeTestUtil {
} }
producer.flush(); producer.flush();
int remainingRetries = 5; retry(producer, dataNeedRetry, stringSerde);
while (!needRetry.isEmpty()) { retry(producer, fkNeedRetry, intSerde);
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) { flush(producer,
System.err.println("Failed to produce all records after multiple retries"); "data",
Exit.exit(1); stringSerde.serializer().serialize("", "flush"),
} intSerde.serializer().serialize("", 0)
} );
flush(producer,
"fk",
intSerde.serializer().serialize("", 0),
stringSerde.serializer().serialize("", "flush")
);
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records. // all suppressed records.
@ -227,6 +248,44 @@ public class SmokeTestDriver extends SmokeTestUtil {
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println(
"retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) { private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties(); final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@ -315,14 +374,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions); consumer.assign(partitions);
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey; final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0; int recordsProcessed = 0;
final Map<String, AtomicInteger> processed = final Map<String, AtomicInteger> processed =
Stream.of(TOPICS) Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.kafka.streams.tests; package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties); System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data"); final KTable<String, Integer> dataTable = builder.table(
dataStream.process(printProcessorSupplier()); "data", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream();
dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo"); dataStream.to("echo");
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
"fk", Consumed.with(intSerde, stringSerde));
buildFKTable(dataTable, fkTable);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
config.putAll(streamsProperties); config.putAll(streamsProperties);
@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
})); }));
} }
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { private static void buildFKTable(final KTable<String, Integer> primaryTable,
final KTable<Integer, String> otherTable) {
final KStream<String, String> kStream = primaryTable
.join(otherTable, v -> v, (k0, v0) -> v0)
.toStream();
kStream.process(printProcessorSupplier("fk"));
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() { return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0; private int numRecordsProcessed = 0;
@Override @Override
public void init(final ProcessorContext context) { public void init(final ProcessorContext context) {
System.out.println("[2.7] initializing processor: topic=data taskId=" + context.taskId()); System.out.println("[2.7] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0; numRecordsProcessed = 0;
} }
@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) { public void process(final K key, final V value) {
numRecordsProcessed++; numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) { if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = { private static final String[] NUMERIC_VALUE_TOPICS = {
"data", "data",
"echo", "echo",
"max", "max",
@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg", "avg",
"tagg" "tagg"
}; };
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record); producer.send(record);
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord);
numRecordsProduced++; numRecordsProduced++;
if (numRecordsProduced % 100 == 0) { if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) { while (remaining > 0) {
@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--; remaining--;
data[index] = data[remaining]; data[index] = data[remaining];
} else { } else {
final ProducerRecord<byte[], byte[]> record = final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>( new ProducerRecord<>(
"data", "data",
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record, new TestCallback(record, dataNeedRetry));
producer.send(record, new TestCallback(record, needRetry)); final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
@ -195,38 +218,61 @@ public class SmokeTestDriver extends SmokeTestUtil {
} }
producer.flush(); producer.flush();
int remainingRetries = 5; retry(producer, dataNeedRetry, stringSerde);
while (!needRetry.isEmpty()) { retry(producer, fkNeedRetry, intSerde);
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) { flush(producer,
System.err.println("Failed to produce all records after multiple retries"); "data",
Exit.exit(1); stringSerde.serializer().serialize("", "flush"),
} intSerde.serializer().serialize("", 0)
} );
flush(producer,
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out "fk",
// all suppressed records. intSerde.serializer().serialize("", 0),
final List<PartitionInfo> partitions = producer.partitionsFor("data"); stringSerde.serializer().serialize("", "flush")
for (final PartitionInfo partition : partitions) { );
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
));
}
} }
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println(
"retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) { private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties(); final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions); consumer.assign(partitions);
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey; final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0; int recordsProcessed = 0;
final Map<String, AtomicInteger> processed = final Map<String, AtomicInteger> processed =
Stream.of(TOPICS) Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.kafka.streams.tests; package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier;
@ -42,12 +49,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties); System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data"); final KTable<String, Integer> dataTable = builder.table(
dataStream.process(printProcessorSupplier()); "data", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream();
dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo"); dataStream.to("echo");
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
"fk", Consumed.with(intSerde, stringSerde));
buildFKTable(dataTable, fkTable);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties); config.putAll(streamsProperties);
@ -61,13 +85,22 @@ public class StreamsUpgradeTest {
})); }));
} }
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() { private static void buildFKTable(final KTable<String, Integer> primaryTable,
final KTable<Integer, String> otherTable) {
final KStream<String, String> kStream = primaryTable
.join(otherTable, v -> v, (k0, v0) -> v0)
.toStream();
kStream.process(printProcessorSupplier("fk"));
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}
private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
return () -> new AbstractProcessor<K, V>() { return () -> new AbstractProcessor<K, V>() {
private int numRecordsProcessed = 0; private int numRecordsProcessed = 0;
@Override @Override
public void init(final ProcessorContext context) { public void init(final ProcessorContext context) {
System.out.println("[2.8] initializing processor: topic=data taskId=" + context.taskId()); System.out.println("[2.8] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0; numRecordsProcessed = 0;
} }
@ -75,7 +108,7 @@ public class StreamsUpgradeTest {
public void process(final K key, final V value) { public void process(final K key, final V value) {
numRecordsProcessed++; numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) { if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = { private static final String[] NUMERIC_VALUE_TOPICS = {
"data", "data",
"echo", "echo",
"max", "max",
@ -72,6 +73,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg", "avg",
"tagg" "tagg"
}; };
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@ -130,9 +139,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record); producer.send(record);
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord);
numRecordsProduced++; numRecordsProduced++;
if (numRecordsProduced % 100 == 0) { if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) { while (remaining > 0) {
@ -175,15 +192,21 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--; remaining--;
data[index] = data[remaining]; data[index] = data[remaining];
} else { } else {
final ProducerRecord<byte[], byte[]> record = final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>( new ProducerRecord<>(
"data", "data",
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record, new TestCallback(record, dataNeedRetry));
producer.send(record, new TestCallback(record, needRetry)); final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
@ -195,38 +218,61 @@ public class SmokeTestDriver extends SmokeTestUtil {
} }
producer.flush(); producer.flush();
int remainingRetries = 5; retry(producer, dataNeedRetry, stringSerde);
while (!needRetry.isEmpty()) { retry(producer, fkNeedRetry, intSerde);
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) { flush(producer,
System.err.println("Failed to produce all records after multiple retries"); "data",
Exit.exit(1); stringSerde.serializer().serialize("", "flush"),
} intSerde.serializer().serialize("", 0)
} );
flush(producer,
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out "fk",
// all suppressed records. intSerde.serializer().serialize("", 0),
final List<PartitionInfo> partitions = producer.partitionsFor("data"); stringSerde.serializer().serialize("", "flush")
for (final PartitionInfo partition : partitions) { );
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
));
}
} }
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println(
"retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) { private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties(); final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@ -315,14 +361,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions); consumer.assign(partitions);
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey; final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0; int recordsProcessed = 0;
final Map<String, AtomicInteger> processed = final Map<String, AtomicInteger> processed =
Stream.of(TOPICS) Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.kafka.streams.tests; package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@ -44,12 +51,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties); System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data"); final KTable<String, Integer> dataTable = builder.table(
dataStream.process(printProcessorSupplier()); "data", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream();
dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo"); dataStream.to("echo");
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
"fk", Consumed.with(intSerde, stringSerde));
buildFKTable(dataTable, fkTable);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties); config.putAll(streamsProperties);
@ -63,13 +87,22 @@ public class StreamsUpgradeTest {
})); }));
} }
private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() { private static void buildFKTable(final KTable<String, Integer> primaryTable,
final KTable<Integer, String> otherTable) {
final KStream<String, String> kStream = primaryTable
.join(otherTable, v -> v, (k0, v0) -> v0)
.toStream();
kStream.process(printProcessorSupplier("fk"));
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}
private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier(final String topic) {
return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() { return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
private int numRecordsProcessed = 0; private int numRecordsProcessed = 0;
@Override @Override
public void init(final ProcessorContext<KOut, VOut> context) { public void init(final ProcessorContext<KOut, VOut> context) {
System.out.println("[3.0] initializing processor: topic=data taskId=" + context.taskId()); System.out.println("[3.0] initializing processor: topic=" + topic + " taskId=" + context.taskId());
numRecordsProcessed = 0; numRecordsProcessed = 0;
} }
@ -77,7 +110,7 @@ public class StreamsUpgradeTest {
public void process(final Record<KIn, VIn> record) { public void process(final Record<KIn, VIn> record) {
numRecordsProcessed++; numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) { if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
} }
} }

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
@ -60,7 +61,7 @@ import static java.util.Collections.emptyMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
private static final String[] TOPICS = { private static final String[] NUMERIC_VALUE_TOPICS = {
"data", "data",
"echo", "echo",
"max", "max",
@ -72,6 +73,15 @@ public class SmokeTestDriver extends SmokeTestUtil {
"avg", "avg",
"tagg" "tagg"
}; };
private static final String[] STRING_VALUE_TOPICS = {
"fk"
};
private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
static {
System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
}
private static final int MAX_RECORD_EMPTY_RETRIES = 30; private static final int MAX_RECORD_EMPTY_RETRIES = 30;
@ -130,9 +140,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
stringSerde.serializer().serialize("", key), stringSerde.serializer().serialize("", key),
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record); producer.send(record);
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord);
numRecordsProduced++; numRecordsProduced++;
if (numRecordsProduced % 100 == 0) { if (numRecordsProduced % 100 == 0) {
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced"); System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
@ -148,7 +165,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
final Duration timeToSpend) { final Duration timeToSpend) {
final Properties producerProps = generatorProperties(kafka); final Properties producerProps = generatorProperties(kafka);
int numRecordsProduced = 0; int numRecordsProduced = 0;
final Map<String, Set<Integer>> allData = new HashMap<>(); final Map<String, Set<Integer>> allData = new HashMap<>();
@ -163,7 +179,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey; final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>(); final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<>();
final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<>();
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) { try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
while (remaining > 0) { while (remaining > 0) {
@ -175,7 +192,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
remaining--; remaining--;
data[index] = data[remaining]; data[index] = data[remaining];
} else { } else {
final ProducerRecord<byte[], byte[]> record = final ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>( new ProducerRecord<>(
"data", "data",
@ -183,7 +199,16 @@ public class SmokeTestDriver extends SmokeTestUtil {
intSerde.serializer().serialize("", value) intSerde.serializer().serialize("", value)
); );
producer.send(record, new TestCallback(record, needRetry)); producer.send(record, new TestCallback(record, dataNeedRetry));
final ProducerRecord<byte[], byte[]> fkRecord =
new ProducerRecord<>(
"fk",
intSerde.serializer().serialize("", value),
stringSerde.serializer().serialize("", key)
);
producer.send(fkRecord, new TestCallback(fkRecord, fkNeedRetry));
numRecordsProduced++; numRecordsProduced++;
allData.get(key).add(value); allData.get(key).add(value);
@ -195,38 +220,61 @@ public class SmokeTestDriver extends SmokeTestUtil {
} }
producer.flush(); producer.flush();
int remainingRetries = 5; retry(producer, dataNeedRetry, stringSerde);
while (!needRetry.isEmpty()) { retry(producer, fkNeedRetry, intSerde);
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) { flush(producer,
System.err.println("Failed to produce all records after multiple retries"); "data",
Exit.exit(1); stringSerde.serializer().serialize("", "flush"),
} intSerde.serializer().serialize("", 0)
} );
flush(producer,
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out "fk",
// all suppressed records. intSerde.serializer().serialize("", 0),
final List<PartitionInfo> partitions = producer.partitionsFor("data"); stringSerde.serializer().serialize("", "flush")
for (final PartitionInfo partition : partitions) { );
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
stringSerde.serializer().serialize("", "flush"),
intSerde.serializer().serialize("", 0)
));
}
} }
return Collections.unmodifiableMap(allData); return Collections.unmodifiableMap(allData);
} }
private static void retry(final KafkaProducer<byte[], byte[]> producer,
List<ProducerRecord<byte[], byte[]>> needRetry,
final Serde<?> keySerde) {
int remainingRetries = 5;
while (!needRetry.isEmpty()) {
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
System.out.println(
"retry producing " + keySerde.deserializer().deserialize("", record.key()));
producer.send(record, new TestCallback(record, needRetry2));
}
producer.flush();
needRetry = needRetry2;
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
System.err.println("Failed to produce all records after multiple retries");
Exit.exit(1);
}
}
}
private static void flush(final KafkaProducer<byte[], byte[]> producer,
final String topic,
final byte[] keyBytes,
final byte[] valBytes) {
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
// all suppressed records.
final List<PartitionInfo> partitions = producer.partitionsFor(topic);
for (final PartitionInfo partition : partitions) {
producer.send(new ProducerRecord<>(
partition.topic(),
partition.partition(),
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
keyBytes,
valBytes
));
}
}
private static Properties generatorProperties(final String kafka) { private static Properties generatorProperties(final String kafka) {
final Properties producerProps = new Properties(); final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
@ -315,14 +363,14 @@ public class SmokeTestDriver extends SmokeTestUtil {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props); final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS); final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
consumer.assign(partitions); consumer.assign(partitions);
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
final int recordsGenerated = inputs.size() * maxRecordsPerKey; final int recordsGenerated = inputs.size() * maxRecordsPerKey;
int recordsProcessed = 0; int recordsProcessed = 0;
final Map<String, AtomicInteger> processed = final Map<String, AtomicInteger> processed =
Stream.of(TOPICS) Stream.of(NUMERIC_VALUE_TOPICS)
.collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>(); final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();

View File

@ -16,11 +16,18 @@
*/ */
package org.apache.kafka.streams.tests; package org.apache.kafka.streams.tests;
import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
import java.util.Random;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@ -44,12 +51,29 @@ public class StreamsUpgradeTest {
System.out.println("props=" + streamsProperties); System.out.println("props=" + streamsProperties);
final StreamsBuilder builder = new StreamsBuilder(); final StreamsBuilder builder = new StreamsBuilder();
final KStream dataStream = builder.stream("data"); final KTable<String, Integer> dataTable = builder.table(
dataStream.process(printProcessorSupplier()); "data", Consumed.with(stringSerde, intSerde));
final KStream<String, Integer> dataStream = dataTable.toStream();
dataStream.process(printProcessorSupplier("data"));
dataStream.to("echo"); dataStream.to("echo");
final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty(
"test.run_fk_join",
"false"));
if (runFkJoin) {
try {
final KTable<Integer, String> fkTable = builder.table(
"fk", Consumed.with(intSerde, stringSerde));
buildFKTable(dataStream, fkTable);
} catch (final Exception e) {
System.err.println("Caught " + e.getMessage());
}
}
final Properties config = new Properties(); final Properties config = new Properties();
config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest"); config.setProperty(
StreamsConfig.APPLICATION_ID_CONFIG,
"StreamsUpgradeTest-" + new Random().nextLong());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.putAll(streamsProperties); config.putAll(streamsProperties);
@ -63,13 +87,22 @@ public class StreamsUpgradeTest {
})); }));
} }
private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier() { private static void buildFKTable(final KStream<String, Integer> primaryTable,
final KTable<Integer, String> otherTable) {
final KStream<String, String> kStream = primaryTable.toTable()
.join(otherTable, v -> v, (k0, v0) -> v0)
.toStream();
kStream.process(printProcessorSupplier("fk"));
kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
}
private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier(final String topic) {
return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() { return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
private int numRecordsProcessed = 0; private int numRecordsProcessed = 0;
@Override @Override
public void init(final ProcessorContext<KOut, VOut> context) { public void init(final ProcessorContext<KOut, VOut> context) {
System.out.println("[3.1] initializing processor: topic=data taskId=" + context.taskId()); System.out.println("[3.1] initializing processor: topic=" + topic + "taskId=" + context.taskId());
numRecordsProcessed = 0; numRecordsProcessed = 0;
} }
@ -77,7 +110,7 @@ public class StreamsUpgradeTest {
public void process(final Record<KIn, VIn> record) { public void process(final Record<KIn, VIn> record) {
numRecordsProcessed++; numRecordsProcessed++;
if (numRecordsProcessed % 100 == 0) { if (numRecordsProcessed % 100 == 0) {
System.out.println("processed " + numRecordsProcessed + " records from topic=data"); System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
} }
} }

View File

@ -22,7 +22,7 @@ from ducktape.utils.util import wait_until
from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import KafkaConfig from kafkatest.services.kafka import KafkaConfig
from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1 from kafkatest.version import KafkaVersion, LATEST_0_10_0, LATEST_0_10_1
STATE_DIR = "state.dir" STATE_DIR = "state.dir"
@ -616,6 +616,9 @@ class StreamsUpgradeTestJobRunnerService(StreamsTestBaseService):
if self.UPGRADE_FROM is not None: if self.UPGRADE_FROM is not None:
properties['upgrade.from'] = self.UPGRADE_FROM properties['upgrade.from'] = self.UPGRADE_FROM
if (self.UPGRADE_FROM is not None and KafkaVersion(self.UPGRADE_FROM).supports_fk_joins()) or \
(self.KAFKA_STREAMS_VERSION is not None and KafkaVersion(self.KAFKA_STREAMS_VERSION).supports_fk_joins()):
properties['test.run_fk_join'] = "true"
if self.UPGRADE_TO == "future_version": if self.UPGRADE_TO == "future_version":
properties['test.future.metadata'] = "any_value" properties['test.future.metadata'] = "any_value"

View File

@ -37,6 +37,8 @@ broker_upgrade_versions = [str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1),
metadata_1_versions = [str(LATEST_0_10_0)] metadata_1_versions = [str(LATEST_0_10_0)]
metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_3_0), str(LATEST_3_1)]
""" """
After each release one should first check that the released version has been uploaded to After each release one should first check that the released version has been uploaded to
@ -86,9 +88,11 @@ class StreamsUpgradeTest(Test):
self.topics = { self.topics = {
'echo' : { 'partitions': 5 }, 'echo' : { 'partitions': 5 },
'data' : { 'partitions': 5 }, 'data' : { 'partitions': 5 },
'fk' : { 'partitions': 5 },
} }
processed_msg = "processed [0-9]* records" processed_data_msg = "processed [0-9]* records from topic=data"
processed_fk_msg = "processed [0-9]* records from topic=fk"
base_version_number = str(DEV_VERSION).split("-")[0] base_version_number = str(DEV_VERSION).split("-")[0]
def perform_broker_upgrade(self, to_version): def perform_broker_upgrade(self, to_version):
@ -159,9 +163,9 @@ class StreamsUpgradeTest(Test):
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
processor.start() processor.start()
monitor.wait_until(self.processed_msg, monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(processor.node)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(processor.node))
connected_message = "Discovered group coordinator" connected_message = "Discovered group coordinator"
with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor: with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
@ -172,9 +176,9 @@ class StreamsUpgradeTest(Test):
timeout_sec=120, timeout_sec=120,
err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account)) err_msg=("Never saw output '%s' on " % connected_message) + str(processor.node.account))
stdout_monitor.wait_until(self.processed_msg, stdout_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on" % self.processed_msg + str(processor.node.account)) err_msg="Never saw output '%s' on" % self.processed_data_msg + str(processor.node.account))
# SmokeTestDriver allows up to 6 minutes to consume all # SmokeTestDriver allows up to 6 minutes to consume all
# records for the verification step so this timeout is set to # records for the verification step so this timeout is set to
@ -192,8 +196,12 @@ class StreamsUpgradeTest(Test):
@cluster(num_nodes=6) @cluster(num_nodes=6)
@matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)]) @matrix(from_version=metadata_1_versions, to_version=[str(DEV_VERSION)])
@matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)]) @matrix(from_version=metadata_2_versions, to_version=[str(DEV_VERSION)])
def test_metadata_upgrade(self, from_version, to_version): @matrix(from_version=fk_join_versions, to_version=[str(DEV_VERSION)])
def test_rolling_upgrade_with_2_bounces(self, from_version, to_version):
""" """
This test verifies that the cluster successfully upgrades despite changes in the metadata and FK
join protocols.
Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version> Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version>
""" """
@ -311,9 +319,14 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str, log_monitor.wait_until(kafka_version_str,
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account))
monitor.wait_until(self.processed_msg, monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
if KafkaVersion(version).supports_fk_joins():
monitor.wait_until(self.processed_fk_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node1.account))
# start second with <version> # start second with <version>
self.prepare_for(self.processor2, version) self.prepare_for(self.processor2, version)
@ -325,12 +338,16 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str, log_monitor.wait_until(kafka_version_str,
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account)) err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account))
first_monitor.wait_until(self.processed_msg, first_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
second_monitor.wait_until(self.processed_msg, second_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node2.account))
if KafkaVersion(version).supports_fk_joins():
second_monitor.wait_until(self.processed_fk_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node2.account))
# start third with <version> # start third with <version>
self.prepare_for(self.processor3, version) self.prepare_for(self.processor3, version)
@ -343,15 +360,19 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str, log_monitor.wait_until(kafka_version_str,
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account)) err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account))
first_monitor.wait_until(self.processed_msg, first_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node1.account))
second_monitor.wait_until(self.processed_msg, second_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node2.account))
third_monitor.wait_until(self.processed_msg, third_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node3.account))
if KafkaVersion(version).supports_fk_joins():
third_monitor.wait_until(self.processed_fk_msg,
timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_fk_msg + str(node2.account))
@staticmethod @staticmethod
def prepare_for(processor, version): def prepare_for(processor, version):
@ -381,12 +402,12 @@ class StreamsUpgradeTest(Test):
with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor: with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor: with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
processor.stop() processor.stop()
first_other_monitor.wait_until(self.processed_msg, first_other_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(first_other_node.account))
second_other_monitor.wait_until(self.processed_msg, second_other_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(second_other_node.account))
node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
if upgrade_from is None: # upgrade disabled -- second round of rolling bounces if upgrade_from is None: # upgrade disabled -- second round of rolling bounces
@ -414,23 +435,23 @@ class StreamsUpgradeTest(Test):
log_monitor.wait_until(kafka_version_str, log_monitor.wait_until(kafka_version_str,
timeout_sec=60, timeout_sec=60,
err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account)) err_msg="Could not detect Kafka Streams version " + new_version + " on " + str(node.account))
first_other_monitor.wait_until(self.processed_msg, first_other_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(first_other_node.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(first_other_node.account))
found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True)) found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0: if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'") raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
second_other_monitor.wait_until(self.processed_msg, second_other_monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(second_other_node.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(second_other_node.account))
found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True)) found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
if len(found) > 0: if len(found) > 0:
raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'") raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
monitor.wait_until(self.processed_msg, monitor.wait_until(self.processed_data_msg,
timeout_sec=60, timeout_sec=60,
err_msg="Never saw output '%s' on " % self.processed_msg + str(node.account)) err_msg="Never saw output '%s' on " % self.processed_data_msg + str(node.account))
def do_rolling_bounce(self, processor, counter, current_generation): def do_rolling_bounce(self, processor, counter, current_generation):

View File

@ -106,6 +106,9 @@ class KafkaVersion(LooseVersion):
# Self-managed clusters always support topic ID, so this method only applies to ZK clusters. # Self-managed clusters always support topic ID, so this method only applies to ZK clusters.
return self >= V_2_8_0 return self >= V_2_8_0
def supports_fk_joins(self):
return hasattr(self, "version") and self >= V_2_4_0
def get_version(node=None): def get_version(node=None):
"""Return the version attached to the given node. """Return the version attached to the given node.
Default to DEV_BRANCH if node or node.version is undefined (aka None) Default to DEV_BRANCH if node or node.version is undefined (aka None)