mirror of https://github.com/apache/kafka.git
KAFKA-7944: Improve Suppress test coverage (#6382)
* add a normal windowed suppress with short windows and a short grace period * improve the smoke test so that it actually verifies the intended conditions See https://issues.apache.org/jira/browse/KAFKA-7944 Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
parent
ab7ea07f5e
commit
8e97540071
|
|
@ -18,12 +18,14 @@ package org.apache.kafka.streams.integration;
|
|||
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||
import org.apache.kafka.streams.tests.SmokeTestClient;
|
||||
import org.apache.kafka.streams.tests.SmokeTestDriver;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
|
@ -53,7 +55,8 @@ public class SmokeTestDriverIntegrationTest {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final Map<String, Set<Integer>> allData = generate(bootstrapServers, numKeys, maxRecordsPerKey, true);
|
||||
final Map<String, Set<Integer>> allData =
|
||||
generate(bootstrapServers, numKeys, maxRecordsPerKey, Duration.ofSeconds(20));
|
||||
result = verify(bootstrapServers, allData, maxRecordsPerKey);
|
||||
|
||||
} catch (final Exception ex) {
|
||||
|
|
@ -76,7 +79,7 @@ public class SmokeTestDriverIntegrationTest {
|
|||
int numClientsCreated = 0;
|
||||
final ArrayList<SmokeTestClient> clients = new ArrayList<>();
|
||||
|
||||
CLUSTER.createTopics(SmokeTestDriver.topics());
|
||||
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, SmokeTestDriver.topics());
|
||||
|
||||
final String bootstrapServers = CLUSTER.bootstrapServers();
|
||||
final Driver driver = new Driver(bootstrapServers, 10, 1000);
|
||||
|
|
|
|||
|
|
@ -49,7 +49,6 @@ import org.junit.runner.RunWith;
|
|||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
|
@ -88,13 +87,16 @@ public class SuppressionDurabilityIntegrationTest {
|
|||
private static final int COMMIT_INTERVAL = 100;
|
||||
private final boolean eosEnabled;
|
||||
|
||||
public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
|
||||
this.eosEnabled = eosEnabled;
|
||||
}
|
||||
|
||||
@Parameters(name = "{index}: eosEnabled={0}")
|
||||
public static Collection<Object[]> parameters() {
|
||||
return Arrays.asList(new Object[] {false}, new Object[] {true});
|
||||
return asList(
|
||||
new Object[] {false},
|
||||
new Object[] {true}
|
||||
);
|
||||
}
|
||||
|
||||
public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) {
|
||||
this.eosEnabled = eosEnabled;
|
||||
}
|
||||
|
||||
private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) {
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
|
|||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
import org.apache.kafka.streams.kstream.Suppressed;
|
||||
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.apache.kafka.streams.kstream.Windowed;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
|
|
@ -40,8 +40,11 @@ import org.apache.kafka.streams.state.WindowStore;
|
|||
import org.apache.kafka.test.TestUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
|
||||
|
||||
public class SmokeTestClient extends SmokeTestUtil {
|
||||
|
||||
private final String name;
|
||||
|
|
@ -113,7 +116,7 @@ public class SmokeTestClient extends SmokeTestUtil {
|
|||
final Topology build = getTopology();
|
||||
final KafkaStreams streamsClient = new KafkaStreams(build, getStreamsConfig(props));
|
||||
streamsClient.setStateListener((newState, oldState) -> {
|
||||
System.out.printf("%s: %s -> %s%n", name, oldState, newState);
|
||||
System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
|
||||
if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
|
||||
started = true;
|
||||
}
|
||||
|
|
@ -149,24 +152,22 @@ public class SmokeTestClient extends SmokeTestUtil {
|
|||
.withRetention(Duration.ofHours(25))
|
||||
);
|
||||
|
||||
minAggregation
|
||||
.toStream()
|
||||
.filterNot((k, v) -> k.key().equals("flush"))
|
||||
.map((key, value) -> new KeyValue<>(key.toString(), value))
|
||||
.to("min-raw", Produced.with(stringSerde, intSerde));
|
||||
streamify(minAggregation, "min-raw");
|
||||
|
||||
minAggregation
|
||||
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
|
||||
.toStream()
|
||||
.filterNot((k, v) -> k.key().equals("flush"))
|
||||
.map((key, value) -> new KeyValue<>(key.toString(), value))
|
||||
.to("min-suppressed", Produced.with(stringSerde, intSerde));
|
||||
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed");
|
||||
|
||||
minAggregation
|
||||
.toStream(new Unwindow<>())
|
||||
.filterNot((k, v) -> k.equals("flush"))
|
||||
.to("min", Produced.with(stringSerde, intSerde));
|
||||
|
||||
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
|
||||
.windowedBy(TimeWindows.of(Duration.ofSeconds(2)).advanceBy(Duration.ofSeconds(1)).grace(Duration.ofSeconds(30)))
|
||||
.reduce((l, r) -> l + r);
|
||||
|
||||
streamify(smallWindowSum, "sws-raw");
|
||||
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed");
|
||||
|
||||
final KTable<String, Integer> minTable = builder.table(
|
||||
"min",
|
||||
Consumed.with(stringSerde, intSerde),
|
||||
|
|
@ -250,4 +251,12 @@ public class SmokeTestClient extends SmokeTestUtil {
|
|||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private static void streamify(final KTable<Windowed<String>, Integer> windowedTable, final String topic) {
|
||||
windowedTable
|
||||
.toStream()
|
||||
.filterNot((k, v) -> k.key().equals("flush"))
|
||||
.map((key, value) -> new KeyValue<>(key.toString(), value))
|
||||
.to(topic, Produced.with(stringSerde, intSerde));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,13 +60,14 @@ import static java.util.Collections.emptyMap;
|
|||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||
|
||||
public class SmokeTestDriver extends SmokeTestUtil {
|
||||
private static final String[] TOPICS = new String[] {
|
||||
private static final String[] TOPICS = {
|
||||
"data",
|
||||
"echo",
|
||||
"max",
|
||||
"min", "min-suppressed", "min-raw",
|
||||
"dif",
|
||||
"sum",
|
||||
"sws-raw", "sws-suppressed",
|
||||
"cnt",
|
||||
"avg",
|
||||
"tagg"
|
||||
|
|
@ -80,18 +81,18 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
private int index;
|
||||
|
||||
ValueList(final int min, final int max) {
|
||||
this.key = min + "-" + max;
|
||||
key = min + "-" + max;
|
||||
|
||||
this.values = new int[max - min + 1];
|
||||
for (int i = 0; i < this.values.length; i++) {
|
||||
this.values[i] = min + i;
|
||||
values = new int[max - min + 1];
|
||||
for (int i = 0; i < values.length; i++) {
|
||||
values[i] = min + i;
|
||||
}
|
||||
// We want to randomize the order of data to test not completely predictable processing order
|
||||
// However, values are also use as a timestamp of the record. (TODO: separate data and timestamp)
|
||||
// We keep some correlation of time and order. Thus, the shuffling is done with a sliding window
|
||||
shuffle(this.values, 10);
|
||||
shuffle(values, 10);
|
||||
|
||||
this.index = 0;
|
||||
index = 0;
|
||||
}
|
||||
|
||||
int next() {
|
||||
|
|
@ -103,18 +104,50 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
return Arrays.copyOf(TOPICS, TOPICS.length);
|
||||
}
|
||||
|
||||
static void generatePerpetually(final String kafka,
|
||||
final int numKeys,
|
||||
final int maxRecordsPerKey) {
|
||||
final Properties producerProps = generatorProperties(kafka);
|
||||
|
||||
int numRecordsProduced = 0;
|
||||
|
||||
final ValueList[] data = new ValueList[numKeys];
|
||||
for (int i = 0; i < numKeys; i++) {
|
||||
data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
|
||||
}
|
||||
|
||||
final Random rand = new Random();
|
||||
|
||||
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
|
||||
while (true) {
|
||||
final int index = rand.nextInt(numKeys);
|
||||
final String key = data[index].key;
|
||||
final int value = data[index].next();
|
||||
|
||||
final ProducerRecord<byte[], byte[]> record =
|
||||
new ProducerRecord<>(
|
||||
"data",
|
||||
stringSerde.serializer().serialize("", key),
|
||||
intSerde.serializer().serialize("", value)
|
||||
);
|
||||
|
||||
producer.send(record);
|
||||
|
||||
numRecordsProduced++;
|
||||
if (numRecordsProduced % 100 == 0) {
|
||||
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
|
||||
}
|
||||
Utils.sleep(2);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, Set<Integer>> generate(final String kafka,
|
||||
final int numKeys,
|
||||
final int maxRecordsPerKey,
|
||||
final boolean autoTerminate) {
|
||||
final Properties producerProps = new Properties();
|
||||
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
|
||||
final Duration timeToSpend) {
|
||||
final Properties producerProps = generatorProperties(kafka);
|
||||
|
||||
final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
|
||||
|
||||
int numRecordsProduced = 0;
|
||||
|
||||
|
|
@ -126,75 +159,84 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
}
|
||||
final Random rand = new Random();
|
||||
|
||||
int remaining = 1; // dummy value must be positive if <autoTerminate> is false
|
||||
if (autoTerminate) {
|
||||
remaining = data.length;
|
||||
}
|
||||
int remaining = data.length;
|
||||
|
||||
final long recordPauseTime = timeToSpend.toMillis() / numKeys / maxRecordsPerKey;
|
||||
|
||||
List<ProducerRecord<byte[], byte[]>> needRetry = new ArrayList<>();
|
||||
|
||||
while (remaining > 0) {
|
||||
final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
|
||||
final String key = data[index].key;
|
||||
final int value = data[index].next();
|
||||
try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps)) {
|
||||
while (remaining > 0) {
|
||||
final int index = rand.nextInt(remaining);
|
||||
final String key = data[index].key;
|
||||
final int value = data[index].next();
|
||||
|
||||
if (autoTerminate && value < 0) {
|
||||
remaining--;
|
||||
data[index] = data[remaining];
|
||||
} else {
|
||||
if (value < 0) {
|
||||
remaining--;
|
||||
data[index] = data[remaining];
|
||||
} else {
|
||||
|
||||
final ProducerRecord<byte[], byte[]> record =
|
||||
new ProducerRecord<>(
|
||||
"data",
|
||||
stringSerde.serializer().serialize("", key),
|
||||
intSerde.serializer().serialize("", value)
|
||||
);
|
||||
final ProducerRecord<byte[], byte[]> record =
|
||||
new ProducerRecord<>(
|
||||
"data",
|
||||
stringSerde.serializer().serialize("", key),
|
||||
intSerde.serializer().serialize("", value)
|
||||
);
|
||||
|
||||
producer.send(record, new TestCallback(record, needRetry));
|
||||
producer.send(record, new TestCallback(record, needRetry));
|
||||
|
||||
numRecordsProduced++;
|
||||
allData.get(key).add(value);
|
||||
if (numRecordsProduced % 100 == 0) {
|
||||
System.out.println(numRecordsProduced + " records produced");
|
||||
numRecordsProduced++;
|
||||
allData.get(key).add(value);
|
||||
if (numRecordsProduced % 100 == 0) {
|
||||
System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
|
||||
}
|
||||
Utils.sleep(Math.max(recordPauseTime, 2));
|
||||
}
|
||||
Utils.sleep(2);
|
||||
}
|
||||
}
|
||||
producer.flush();
|
||||
|
||||
int remainingRetries = 5;
|
||||
while (!needRetry.isEmpty()) {
|
||||
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
|
||||
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
|
||||
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
|
||||
producer.send(record, new TestCallback(record, needRetry2));
|
||||
}
|
||||
producer.flush();
|
||||
needRetry = needRetry2;
|
||||
|
||||
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
|
||||
System.err.println("Failed to produce all records after multiple retries");
|
||||
Exit.exit(1);
|
||||
int remainingRetries = 5;
|
||||
while (!needRetry.isEmpty()) {
|
||||
final List<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<>();
|
||||
for (final ProducerRecord<byte[], byte[]> record : needRetry) {
|
||||
System.out.println("retry producing " + stringSerde.deserializer().deserialize("", record.key()));
|
||||
producer.send(record, new TestCallback(record, needRetry2));
|
||||
}
|
||||
producer.flush();
|
||||
needRetry = needRetry2;
|
||||
|
||||
if (--remainingRetries == 0 && !needRetry.isEmpty()) {
|
||||
System.err.println("Failed to produce all records after multiple retries");
|
||||
Exit.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
|
||||
// all suppressed records.
|
||||
final List<PartitionInfo> partitions = producer.partitionsFor("data");
|
||||
for (final PartitionInfo partition : partitions) {
|
||||
producer.send(new ProducerRecord<>(
|
||||
partition.topic(),
|
||||
partition.partition(),
|
||||
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
|
||||
stringSerde.serializer().serialize("", "flush"),
|
||||
intSerde.serializer().serialize("", 0)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// now that we've sent everything, we'll send some final records with a timestamp high enough to flush out
|
||||
// all suppressed records.
|
||||
final List<PartitionInfo> partitions = producer.partitionsFor("data");
|
||||
for (final PartitionInfo partition : partitions) {
|
||||
producer.send(new ProducerRecord<>(
|
||||
partition.topic(),
|
||||
partition.partition(),
|
||||
System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
|
||||
stringSerde.serializer().serialize("", "flush"),
|
||||
intSerde.serializer().serialize("", 0)
|
||||
));
|
||||
}
|
||||
|
||||
producer.close();
|
||||
return Collections.unmodifiableMap(allData);
|
||||
}
|
||||
|
||||
private static Properties generatorProperties(final String kafka) {
|
||||
final Properties producerProps = new Properties();
|
||||
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
|
||||
return producerProps;
|
||||
}
|
||||
|
||||
private static class TestCallback implements Callback {
|
||||
private final ProducerRecord<byte[], byte[]> originalRecord;
|
||||
private final List<ProducerRecord<byte[], byte[]>> needRetry;
|
||||
|
|
@ -232,12 +274,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
}
|
||||
|
||||
public static class NumberDeserializer implements Deserializer<Number> {
|
||||
|
||||
@Override
|
||||
public void configure(final Map<String, ?> configs, final boolean isKey) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Number deserialize(final String topic, final byte[] data) {
|
||||
final Number value;
|
||||
|
|
@ -247,6 +283,8 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
case "min":
|
||||
case "min-raw":
|
||||
case "min-suppressed":
|
||||
case "sws-raw":
|
||||
case "sws-suppressed":
|
||||
case "max":
|
||||
case "dif":
|
||||
value = intSerde.deserializer().deserialize(topic, data);
|
||||
|
|
@ -264,11 +302,6 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public static VerificationResult verify(final String kafka,
|
||||
|
|
@ -279,6 +312,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class);
|
||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
|
||||
|
||||
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
|
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, TOPICS);
|
||||
|
|
@ -406,7 +440,12 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
boolean pass;
|
||||
try (final PrintStream resultStream = new PrintStream(byteArrayOutputStream)) {
|
||||
pass = verifyTAgg(resultStream, inputs, events.get("tagg"));
|
||||
pass &= verifySuppressed(resultStream, "min-suppressed", inputs, events, SmokeTestDriver::getMin);
|
||||
pass &= verifySuppressed(resultStream, "min-suppressed", events);
|
||||
pass &= verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
|
||||
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
|
||||
return getMin(unwindowedKey);
|
||||
});
|
||||
pass &= verifySuppressed(resultStream, "sws-suppressed", events);
|
||||
pass &= verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin);
|
||||
pass &= verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax);
|
||||
pass &= verify(resultStream, "dif", inputs, events, key -> getMax(key).intValue() - getMin(key).intValue());
|
||||
|
|
@ -457,9 +496,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
|
||||
private static boolean verifySuppressed(final PrintStream resultStream,
|
||||
@SuppressWarnings("SameParameterValue") final String topic,
|
||||
final Map<String, Set<Integer>> inputs,
|
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events,
|
||||
final Function<String, Number> getMin) {
|
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events) {
|
||||
resultStream.println("verifying suppressed " + topic);
|
||||
final Map<String, LinkedList<ConsumerRecord<String, Number>>> topicEvents = events.getOrDefault(topic, emptyMap());
|
||||
for (final Map.Entry<String, LinkedList<ConsumerRecord<String, Number>>> entry : topicEvents.entrySet()) {
|
||||
|
|
@ -476,14 +513,11 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
return false;
|
||||
}
|
||||
}
|
||||
return verify(resultStream, topic, inputs, events, windowedKey -> {
|
||||
final String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
|
||||
return getMin.apply(unwindowedKey);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
private static String indent(@SuppressWarnings("SameParameterValue") final String prefix,
|
||||
final LinkedList<ConsumerRecord<String, Number>> list) {
|
||||
final Iterable<ConsumerRecord<String, Number>> list) {
|
||||
final StringBuilder stringBuilder = new StringBuilder();
|
||||
for (final ConsumerRecord<String, Number> record : list) {
|
||||
stringBuilder.append(prefix).append(record).append('\n');
|
||||
|
|
@ -494,13 +528,13 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
private static Long getSum(final String key) {
|
||||
final int min = getMin(key).intValue();
|
||||
final int max = getMax(key).intValue();
|
||||
return ((long) min + (long) max) * (max - min + 1L) / 2L;
|
||||
return ((long) min + max) * (max - min + 1L) / 2L;
|
||||
}
|
||||
|
||||
private static Double getAvg(final String key) {
|
||||
final int min = getMin(key).intValue();
|
||||
final int max = getMax(key).intValue();
|
||||
return ((long) min + (long) max) / 2.0;
|
||||
return ((long) min + max) / 2.0;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -554,7 +588,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
}
|
||||
|
||||
private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer, final String... topics) {
|
||||
final ArrayList<TopicPartition> partitions = new ArrayList<>();
|
||||
final List<TopicPartition> partitions = new ArrayList<>();
|
||||
|
||||
for (final String topic : topics) {
|
||||
for (final PartitionInfo info : consumer.partitionsFor(topic)) {
|
||||
|
|
|
|||
|
|
@ -20,11 +20,15 @@ import org.apache.kafka.common.utils.Utils;
|
|||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
|
||||
import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
|
||||
|
||||
public class StreamsSmokeTest {
|
||||
|
||||
/**
|
||||
|
|
@ -62,16 +66,23 @@ public class StreamsSmokeTest {
|
|||
final int numKeys = 10;
|
||||
final int maxRecordsPerKey = 500;
|
||||
if (disableAutoTerminate) {
|
||||
SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
|
||||
generatePerpetually(kafka, numKeys, maxRecordsPerKey);
|
||||
} else {
|
||||
final Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, true);
|
||||
// slow down data production to span 30 seconds so that system tests have time to
|
||||
// do their bounces, etc.
|
||||
final Map<String, Set<Integer>> allData =
|
||||
generate(kafka, numKeys, maxRecordsPerKey, Duration.ofSeconds(30));
|
||||
SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
|
||||
}
|
||||
break;
|
||||
case "process":
|
||||
// this starts a KafkaStreams client
|
||||
final SmokeTestClient client = new SmokeTestClient(UUID.randomUUID().toString());
|
||||
client.start(streamsProperties);
|
||||
// this starts the stream processing app
|
||||
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
|
||||
break;
|
||||
case "process-eos":
|
||||
// this starts the stream processing app with EOS
|
||||
streamsProperties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
|
||||
new SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
|
||||
break;
|
||||
case "close-deadlock-test":
|
||||
final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka);
|
||||
|
|
|
|||
|
|
@ -353,6 +353,10 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
|
|||
def __init__(self, test_context, kafka):
|
||||
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process")
|
||||
|
||||
class StreamsSmokeTestEOSJobRunnerService(StreamsSmokeTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
super(StreamsSmokeTestEOSJobRunnerService, self).__init__(test_context, kafka, "process-eos")
|
||||
|
||||
|
||||
class StreamsEosTestDriverService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
|
|
|
|||
|
|
@ -1,75 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.tests.kafka_test import KafkaTest
|
||||
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
|
||||
import time
|
||||
|
||||
|
||||
class StreamsBounceTest(KafkaTest):
|
||||
"""
|
||||
Simple test of Kafka Streams.
|
||||
"""
|
||||
|
||||
def __init__(self, test_context):
|
||||
super(StreamsBounceTest, self).__init__(test_context, num_zk=1, num_brokers=3, topics={
|
||||
'echo' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'data' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'min' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'max' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'sum' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'dif' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'cnt' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'avg' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'wcnt' : { 'partitions': 5, 'replication-factor': 2 },
|
||||
'tagg' : { 'partitions': 5, 'replication-factor': 2 }
|
||||
})
|
||||
|
||||
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
|
||||
self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
||||
|
||||
@cluster(num_nodes=6)
|
||||
def test_bounce(self):
|
||||
"""
|
||||
Start a smoke test client, then abort (kill -9) and restart it a few times.
|
||||
Ensure that all records are delivered.
|
||||
"""
|
||||
|
||||
self.driver.start()
|
||||
|
||||
self.processor1.start()
|
||||
|
||||
time.sleep(15)
|
||||
|
||||
self.processor1.abortThenRestart()
|
||||
|
||||
time.sleep(15)
|
||||
|
||||
# enable this after we add change log partition replicas
|
||||
#self.kafka.signal_leader("data")
|
||||
|
||||
#time.sleep(15);
|
||||
|
||||
self.processor1.abortThenRestart()
|
||||
|
||||
self.driver.wait()
|
||||
self.driver.stop()
|
||||
|
||||
self.processor1.stop()
|
||||
|
||||
node = self.driver.node
|
||||
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
|
||||
|
|
@ -14,11 +14,11 @@
|
|||
# limitations under the License.
|
||||
|
||||
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark.resource import cluster
|
||||
|
||||
from kafkatest.tests.kafka_test import KafkaTest
|
||||
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
|
||||
import time
|
||||
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, StreamsSmokeTestEOSJobRunnerService
|
||||
|
||||
|
||||
class StreamsSmokeTest(KafkaTest):
|
||||
|
|
@ -31,8 +31,12 @@ class StreamsSmokeTest(KafkaTest):
|
|||
'echo' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'data' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'min' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'min-suppressed' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'min-raw' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'max' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'sum' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'sws-raw' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'sws-suppressed' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'dif' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'cnt' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
'avg' : { 'partitions': 5, 'replication-factor': 1 },
|
||||
|
|
@ -40,39 +44,77 @@ class StreamsSmokeTest(KafkaTest):
|
|||
'tagg' : { 'partitions': 5, 'replication-factor': 1 }
|
||||
})
|
||||
|
||||
self.test_context = test_context
|
||||
self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
|
||||
self.processor1 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
||||
self.processor2 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
||||
self.processor3 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
||||
self.processor4 = StreamsSmokeTestJobRunnerService(test_context, self.kafka)
|
||||
|
||||
@cluster(num_nodes=9)
|
||||
def test_streams(self):
|
||||
"""
|
||||
Start a few smoke test clients, then repeat start a new one, stop (cleanly) running one a few times.
|
||||
Ensure that all results (stats on values computed by Kafka Streams) are correct.
|
||||
"""
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(eos=[True, False], crash=[True, False])
|
||||
def test_streams(self, eos, crash):
|
||||
#
|
||||
if eos:
|
||||
processor1 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
|
||||
processor2 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
|
||||
processor3 = StreamsSmokeTestEOSJobRunnerService(self.test_context, self.kafka)
|
||||
else:
|
||||
processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
|
||||
processor2 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
|
||||
processor3 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
|
||||
|
||||
self.driver.start()
|
||||
|
||||
self.processor1.start()
|
||||
self.processor2.start()
|
||||
|
||||
time.sleep(15)
|
||||
with processor1.node.account.monitor_log(processor1.STDOUT_FILE) as monitor1:
|
||||
processor1.start()
|
||||
monitor1.wait_until('REBALANCING -> RUNNING',
|
||||
timeout_sec=60,
|
||||
err_msg="Never saw 'REBALANCING -> RUNNING' message " + str(processor1.node.account)
|
||||
)
|
||||
|
||||
self.processor3.start()
|
||||
self.processor1.stop()
|
||||
self.driver.start()
|
||||
|
||||
time.sleep(15)
|
||||
monitor1.wait_until('processed',
|
||||
timeout_sec=30,
|
||||
err_msg="Didn't see any processing messages " + str(processor1.node.account)
|
||||
)
|
||||
|
||||
self.processor4.start()
|
||||
# make sure we're not already done processing (which would invalidate the test)
|
||||
self.driver.node.account.ssh("! grep 'Result Verification' %s" % self.driver.STDOUT_FILE, allow_fail=False)
|
||||
|
||||
processor1.stop_nodes(not crash)
|
||||
|
||||
with processor2.node.account.monitor_log(processor2.STDOUT_FILE) as monitor2:
|
||||
processor2.start()
|
||||
monitor2.wait_until('REBALANCING -> RUNNING',
|
||||
timeout_sec=120,
|
||||
err_msg="Never saw 'REBALANCING -> RUNNING' message " + str(processor2.node.account)
|
||||
)
|
||||
monitor2.wait_until('processed',
|
||||
timeout_sec=30,
|
||||
err_msg="Didn't see any processing messages " + str(processor2.node.account)
|
||||
)
|
||||
|
||||
# make sure we're not already done processing (which would invalidate the test)
|
||||
self.driver.node.account.ssh("! grep 'Result Verification' %s" % self.driver.STDOUT_FILE, allow_fail=False)
|
||||
|
||||
processor2.stop_nodes(not crash)
|
||||
|
||||
with processor3.node.account.monitor_log(processor3.STDOUT_FILE) as monitor3:
|
||||
processor3.start()
|
||||
monitor3.wait_until('REBALANCING -> RUNNING',
|
||||
timeout_sec=120,
|
||||
err_msg="Never saw 'REBALANCING -> RUNNING' message " + str(processor3.node.account)
|
||||
)
|
||||
# there should still be some data left for this processor to work on.
|
||||
monitor3.wait_until('processed',
|
||||
timeout_sec=30,
|
||||
err_msg="Didn't see any processing messages " + str(processor3.node.account)
|
||||
)
|
||||
|
||||
self.driver.wait()
|
||||
self.driver.stop()
|
||||
|
||||
self.processor2.stop()
|
||||
self.processor3.stop()
|
||||
self.processor4.stop()
|
||||
processor3.stop()
|
||||
|
||||
node = self.driver.node
|
||||
node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)
|
||||
if crash and not eos:
|
||||
self.driver.node.account.ssh("grep -E 'SUCCESS|PROCESSED-MORE-THAN-GENERATED' %s" % self.driver.STDOUT_FILE, allow_fail=False)
|
||||
else:
|
||||
self.driver.node.account.ssh("grep SUCCESS %s" % self.driver.STDOUT_FILE, allow_fail=False)
|
||||
|
|
|
|||
Loading…
Reference in New Issue