mirror of https://github.com/apache/kafka.git
remove eos tests
This commit is contained in:
parent
d56ccebafc
commit
aa0f4ea808
|
@ -249,7 +249,7 @@
|
|||
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/>
|
||||
|
||||
<suppress checks="NPathComplexity"
|
||||
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/>
|
||||
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/>
|
||||
|
||||
<suppress checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
|
||||
files="Murmur3Test.java"/>
|
||||
|
|
|
@ -1,225 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.tests;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
|
||||
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
|
||||
import org.apache.kafka.streams.internals.ConsumerWrapper;
|
||||
import org.apache.kafka.streams.kstream.KGroupedStream;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Produced;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class EosTestClient extends SmokeTestUtil {
|
||||
|
||||
static final String APP_ID = "EosTest";
|
||||
private final Properties properties;
|
||||
private final boolean withRepartitioning;
|
||||
private final AtomicBoolean notRunningCallbackReceived = new AtomicBoolean(false);
|
||||
private static final List<CapturingConsumerWrapper> CAPTURING_CONSUMER_WRAPPERS = new ArrayList<>();
|
||||
private int minGroupEpoch = 0;
|
||||
|
||||
private KafkaStreams streams;
|
||||
private boolean uncaughtException;
|
||||
|
||||
EosTestClient(final Properties properties, final boolean withRepartitioning) {
|
||||
super();
|
||||
this.properties = properties;
|
||||
this.withRepartitioning = withRepartitioning;
|
||||
this.properties.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, CapturingConsumerWrapper.class);
|
||||
CAPTURING_CONSUMER_WRAPPERS.clear();
|
||||
}
|
||||
|
||||
private volatile boolean isRunning = true;
|
||||
|
||||
public void start() {
|
||||
Exit.addShutdownHook("streams-shutdown-hook", () -> {
|
||||
isRunning = false;
|
||||
streams.close(Duration.ofSeconds(300));
|
||||
|
||||
// need to wait for callback to avoid race condition
|
||||
// -> make sure the callback printout to stdout is there as it is expected test output
|
||||
waitForStateTransitionCallback();
|
||||
|
||||
// do not remove these printouts since they are needed for health scripts
|
||||
if (!uncaughtException) {
|
||||
System.out.println(System.currentTimeMillis());
|
||||
System.out.println("EOS-TEST-CLIENT-CLOSED");
|
||||
System.out.flush();
|
||||
}
|
||||
});
|
||||
|
||||
while (isRunning) {
|
||||
if (streams == null) {
|
||||
uncaughtException = false;
|
||||
|
||||
streams = createKafkaStreams(properties);
|
||||
streams.setUncaughtExceptionHandler(e -> {
|
||||
System.out.println(System.currentTimeMillis());
|
||||
System.out.println("EOS-TEST-CLIENT-EXCEPTION");
|
||||
e.printStackTrace();
|
||||
System.out.flush();
|
||||
uncaughtException = true;
|
||||
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
|
||||
});
|
||||
streams.setStateListener((newState, oldState) -> {
|
||||
// don't remove this -- it's required test output
|
||||
System.out.println(System.currentTimeMillis());
|
||||
System.out.println("StateChange: " + oldState + " -> " + newState);
|
||||
System.out.flush();
|
||||
if (newState == KafkaStreams.State.NOT_RUNNING) {
|
||||
notRunningCallbackReceived.set(true);
|
||||
}
|
||||
});
|
||||
streams.start();
|
||||
}
|
||||
if (uncaughtException) {
|
||||
streams.close(Duration.ofSeconds(60_000L));
|
||||
streams = null;
|
||||
}
|
||||
logGroupEpochBump();
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
private KafkaStreams createKafkaStreams(final Properties props) {
|
||||
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
|
||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
|
||||
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
|
||||
props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, Duration.ofMinutes(1).toMillis());
|
||||
props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, Integer.MAX_VALUE);
|
||||
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
|
||||
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
|
||||
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase commit interval to make sure a client is killed having an open transaction
|
||||
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
|
||||
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
final KStream<String, Integer> data = builder.stream("data");
|
||||
|
||||
data.to("echo");
|
||||
data.process(SmokeTestUtil.printProcessorSupplier("data"));
|
||||
|
||||
final KGroupedStream<String, Integer> groupedData = data.groupByKey();
|
||||
// min
|
||||
groupedData
|
||||
.aggregate(
|
||||
() -> Integer.MAX_VALUE,
|
||||
(aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
|
||||
Materialized.with(null, intSerde))
|
||||
.toStream()
|
||||
.to("min", Produced.with(stringSerde, intSerde));
|
||||
|
||||
// sum
|
||||
groupedData.aggregate(
|
||||
() -> 0L,
|
||||
(aggKey, value, aggregate) -> (long) value + aggregate,
|
||||
Materialized.with(null, longSerde))
|
||||
.toStream()
|
||||
.to("sum", Produced.with(stringSerde, longSerde));
|
||||
|
||||
if (withRepartitioning) {
|
||||
data.to("repartition");
|
||||
final KStream<String, Integer> repartitionedData = builder.stream("repartition");
|
||||
|
||||
repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition"));
|
||||
|
||||
final KGroupedStream<String, Integer> groupedDataAfterRepartitioning = repartitionedData.groupByKey();
|
||||
// max
|
||||
groupedDataAfterRepartitioning
|
||||
.aggregate(
|
||||
() -> Integer.MIN_VALUE,
|
||||
(aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
|
||||
Materialized.with(null, intSerde))
|
||||
.toStream()
|
||||
.to("max", Produced.with(stringSerde, intSerde));
|
||||
|
||||
// count
|
||||
groupedDataAfterRepartitioning.count()
|
||||
.toStream()
|
||||
.to("cnt", Produced.with(stringSerde, longSerde));
|
||||
}
|
||||
|
||||
return new KafkaStreams(builder.build(), props);
|
||||
}
|
||||
|
||||
private void waitForStateTransitionCallback() {
|
||||
final long maxWaitTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(300);
|
||||
while (!notRunningCallbackReceived.get() && System.currentTimeMillis() < maxWaitTime) {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (final InterruptedException ignoreAndSwallow) { /* just keep waiting */ }
|
||||
}
|
||||
if (!notRunningCallbackReceived.get()) {
|
||||
System.err.println("State transition callback to NOT_RUNNING never received. Timed out after 5 minutes.");
|
||||
System.err.flush();
|
||||
}
|
||||
}
|
||||
|
||||
// Used in the streams group protocol
|
||||
// Detect a completed rebalance by checking if the group epoch has been bumped for all threads.
|
||||
private void logGroupEpochBump() {
|
||||
int currentMin = Integer.MAX_VALUE;
|
||||
for (final CapturingConsumerWrapper consumer : CAPTURING_CONSUMER_WRAPPERS) {
|
||||
final int groupEpoch = consumer.lastSeenGroupEpoch;
|
||||
if (groupEpoch < currentMin) {
|
||||
currentMin = groupEpoch;
|
||||
}
|
||||
}
|
||||
if (currentMin > minGroupEpoch) {
|
||||
System.out.println("MemberEpochBump");
|
||||
}
|
||||
if (currentMin != Integer.MAX_VALUE) {
|
||||
minGroupEpoch = currentMin;
|
||||
}
|
||||
}
|
||||
|
||||
public static class CapturingConsumerWrapper extends ConsumerWrapper {
|
||||
|
||||
public volatile int lastSeenGroupEpoch = 0;
|
||||
|
||||
@Override
|
||||
public void wrapConsumer(final AsyncKafkaConsumer<byte[], byte[]> delegate, final Map<String, Object> config, final Optional<StreamsRebalanceData> streamsRebalanceData) {
|
||||
CAPTURING_CONSUMER_WRAPPERS.add(this);
|
||||
super.wrapConsumer(delegate, config, streamsRebalanceData);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumerGroupMetadata groupMetadata() {
|
||||
final ConsumerGroupMetadata consumerGroupMetadata = delegate.groupMetadata();
|
||||
lastSeenGroupEpoch = consumerGroupMetadata.generationId();
|
||||
return consumerGroupMetadata;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,678 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.tests;
|
||||
|
||||
import org.apache.kafka.clients.admin.Admin;
|
||||
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
||||
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
|
||||
import org.apache.kafka.clients.admin.StreamsGroupDescription;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.IsolationLevel;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerDeserializer;
|
||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||
import org.apache.kafka.common.serialization.LongDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class EosTestDriver extends SmokeTestUtil {
|
||||
|
||||
private static final int MAX_NUMBER_OF_KEYS = 20000;
|
||||
private static final long MAX_IDLE_TIME_MS = 600000L;
|
||||
|
||||
private static volatile boolean isRunning = true;
|
||||
private static final CountDownLatch TERMINATED = new CountDownLatch(1);
|
||||
|
||||
private static int numRecordsProduced = 0;
|
||||
|
||||
private static synchronized void updateNumRecordsProduces(final int delta) {
|
||||
numRecordsProduced += delta;
|
||||
}
|
||||
|
||||
static void generate(final String kafka) {
|
||||
Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> {
|
||||
System.out.println("Terminating");
|
||||
isRunning = false;
|
||||
|
||||
try {
|
||||
if (TERMINATED.await(5L, TimeUnit.MINUTES)) {
|
||||
System.out.println("Terminated");
|
||||
} else {
|
||||
System.out.println("Terminated with timeout");
|
||||
}
|
||||
} catch (final InterruptedException swallow) {
|
||||
swallow.printStackTrace(System.err);
|
||||
System.out.println("Terminated with error");
|
||||
}
|
||||
System.err.flush();
|
||||
System.out.flush();
|
||||
});
|
||||
|
||||
final Properties producerProps = new Properties();
|
||||
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest");
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
|
||||
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
|
||||
|
||||
final Map<Integer, List<Long>> offsets = new HashMap<>();
|
||||
|
||||
try {
|
||||
try (final KafkaProducer<String, Integer> producer = new KafkaProducer<>(producerProps)) {
|
||||
final Random rand = new Random(System.currentTimeMillis());
|
||||
|
||||
while (isRunning) {
|
||||
final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
|
||||
final int value = rand.nextInt(10000);
|
||||
|
||||
final ProducerRecord<String, Integer> record = new ProducerRecord<>("data", key, value);
|
||||
|
||||
producer.send(record, (metadata, exception) -> {
|
||||
if (exception != null) {
|
||||
exception.printStackTrace(System.err);
|
||||
System.err.flush();
|
||||
if (exception instanceof TimeoutException) {
|
||||
try {
|
||||
// message == org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for data-0: 30004 ms has passed since last attempt plus backoff time
|
||||
final int expired = Integer.parseInt(exception.getMessage().split(" ")[2]);
|
||||
updateNumRecordsProduces(-expired);
|
||||
} catch (final Exception ignore) {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
offsets.getOrDefault(metadata.partition(), new LinkedList<>()).add(metadata.offset());
|
||||
}
|
||||
});
|
||||
|
||||
updateNumRecordsProduces(1);
|
||||
if (numRecordsProduced % 1000 == 0) {
|
||||
System.out.println(numRecordsProduced + " records produced");
|
||||
System.out.flush();
|
||||
}
|
||||
Utils.sleep(rand.nextInt(10));
|
||||
}
|
||||
}
|
||||
System.out.println("Producer closed: " + numRecordsProduced + " records produced");
|
||||
System.out.flush();
|
||||
|
||||
// verify offsets
|
||||
for (final Map.Entry<Integer, List<Long>> offsetsOfPartition : offsets.entrySet()) {
|
||||
offsetsOfPartition.getValue().sort(Long::compareTo);
|
||||
for (int i = 0; i < offsetsOfPartition.getValue().size() - 1; ++i) {
|
||||
if (offsetsOfPartition.getValue().get(i) != i) {
|
||||
System.err.println("Offset for partition " + offsetsOfPartition.getKey() + " is not " + i + " as expected but " + offsetsOfPartition.getValue().get(i));
|
||||
System.err.flush();
|
||||
}
|
||||
}
|
||||
System.out.println("Max offset of partition " + offsetsOfPartition.getKey() + " is " + offsetsOfPartition.getValue().get(offsetsOfPartition.getValue().size() - 1));
|
||||
}
|
||||
|
||||
final Properties props = new Properties();
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString());
|
||||
|
||||
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
|
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, "data");
|
||||
System.out.println("Partitions: " + partitions);
|
||||
System.out.flush();
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToEnd(partitions);
|
||||
|
||||
for (final TopicPartition tp : partitions) {
|
||||
System.out.println("End-offset for " + tp + " is " + consumer.position(tp));
|
||||
System.out.flush();
|
||||
}
|
||||
}
|
||||
System.out.flush();
|
||||
} finally {
|
||||
TERMINATED.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public static void verify(final String kafka, final boolean withRepartitioning, final String groupProtocol) {
|
||||
final Properties props = new Properties();
|
||||
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString());
|
||||
|
||||
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
|
||||
verifyAllTransactionFinished(consumer, kafka, withRepartitioning);
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace(System.err);
|
||||
System.out.println("FAILED");
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<TopicPartition, Long> committedOffsets;
|
||||
try (final Admin adminClient = Admin.create(props)) {
|
||||
ensureStreamsApplicationDown(adminClient, groupProtocol);
|
||||
|
||||
committedOffsets = getCommittedOffsets(adminClient, withRepartitioning);
|
||||
}
|
||||
|
||||
final String[] allInputTopics;
|
||||
final String[] allOutputTopics;
|
||||
if (withRepartitioning) {
|
||||
allInputTopics = new String[] {"data", "repartition"};
|
||||
allOutputTopics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"};
|
||||
} else {
|
||||
allInputTopics = new String[] {"data"};
|
||||
allOutputTopics = new String[] {"echo", "min", "sum"};
|
||||
}
|
||||
|
||||
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> inputRecordsPerTopicPerPartition;
|
||||
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
|
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, allInputTopics);
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToBeginning(partitions);
|
||||
|
||||
inputRecordsPerTopicPerPartition = getRecords(consumer, committedOffsets, withRepartitioning, true);
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace(System.err);
|
||||
System.out.println("FAILED");
|
||||
return;
|
||||
}
|
||||
|
||||
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> outputRecordsPerTopicPerPartition;
|
||||
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props)) {
|
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, allOutputTopics);
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToBeginning(partitions);
|
||||
|
||||
outputRecordsPerTopicPerPartition = getRecords(consumer, consumer.endOffsets(partitions), withRepartitioning, false);
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace(System.err);
|
||||
System.out.println("FAILED");
|
||||
return;
|
||||
}
|
||||
|
||||
verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("echo"));
|
||||
if (withRepartitioning) {
|
||||
verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("repartition"));
|
||||
}
|
||||
|
||||
verifyMin(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("min"));
|
||||
verifySum(inputRecordsPerTopicPerPartition.get("data"), outputRecordsPerTopicPerPartition.get("sum"));
|
||||
|
||||
if (withRepartitioning) {
|
||||
verifyMax(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("max"));
|
||||
verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), outputRecordsPerTopicPerPartition.get("cnt"));
|
||||
}
|
||||
|
||||
// do not modify: required test output
|
||||
System.out.println("ALL-RECORDS-DELIVERED");
|
||||
System.out.flush();
|
||||
}
|
||||
|
||||
private static void ensureStreamsApplicationDown(final Admin adminClient, final String groupProtocol) {
|
||||
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
|
||||
boolean isEmpty;
|
||||
do {
|
||||
if (Objects.equals(groupProtocol, "streams")) {
|
||||
final StreamsGroupDescription description = getStreamsGroupDescription(adminClient);
|
||||
isEmpty = description.members().isEmpty();
|
||||
if (System.currentTimeMillis() > maxWaitTime && !isEmpty) {
|
||||
throwNotDownException(description);
|
||||
}
|
||||
} else {
|
||||
final ConsumerGroupDescription description = getConsumerGroupDescription(adminClient);
|
||||
isEmpty = description.members().isEmpty();
|
||||
if (System.currentTimeMillis() > maxWaitTime && !isEmpty) {
|
||||
throwNotDownException(description);
|
||||
}
|
||||
}
|
||||
sleep(1000L);
|
||||
} while (!isEmpty);
|
||||
}
|
||||
|
||||
private static void throwNotDownException(final Object description) {
|
||||
throw new RuntimeException(
|
||||
"Streams application not down after " + MAX_IDLE_TIME_MS / 1000L + " seconds. " +
|
||||
"Group: " + description
|
||||
);
|
||||
}
|
||||
|
||||
private static Map<TopicPartition, Long> getCommittedOffsets(final Admin adminClient,
|
||||
final boolean withRepartitioning) {
|
||||
final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap;
|
||||
|
||||
try {
|
||||
final ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID);
|
||||
topicPartitionOffsetAndMetadataMap = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
|
||||
} catch (final Exception e) {
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
|
||||
|
||||
for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : topicPartitionOffsetAndMetadataMap.entrySet()) {
|
||||
final String topic = entry.getKey().topic();
|
||||
if (topic.equals("data") || withRepartitioning && topic.equals("repartition")) {
|
||||
committedOffsets.put(entry.getKey(), entry.getValue().offset());
|
||||
}
|
||||
}
|
||||
|
||||
return committedOffsets;
|
||||
}
|
||||
|
||||
private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> getRecords(final KafkaConsumer<byte[], byte[]> consumer,
|
||||
final Map<TopicPartition, Long> readEndOffsets,
|
||||
final boolean withRepartitioning,
|
||||
final boolean isInputTopic) {
|
||||
System.out.println("read end offset: " + readEndOffsets);
|
||||
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
|
||||
final Map<TopicPartition, Long> maxReceivedOffsetPerPartition = new HashMap<>();
|
||||
final Map<TopicPartition, Long> maxConsumerPositionPerPartition = new HashMap<>();
|
||||
|
||||
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
|
||||
boolean allRecordsReceived = false;
|
||||
while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
|
||||
final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(Duration.ofSeconds(1L));
|
||||
|
||||
for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
|
||||
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
|
||||
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
|
||||
maxReceivedOffsetPerPartition.put(tp, record.offset());
|
||||
final long readEndOffset = readEndOffsets.get(tp);
|
||||
if (record.offset() < readEndOffset) {
|
||||
addRecord(record, recordPerTopicPerPartition, withRepartitioning);
|
||||
} else if (!isInputTopic) {
|
||||
throw new RuntimeException("FAIL: did receive more records than expected for " + tp
|
||||
+ " (expected EOL offset: " + readEndOffset + "; current offset: " + record.offset());
|
||||
}
|
||||
}
|
||||
|
||||
for (final TopicPartition tp : readEndOffsets.keySet()) {
|
||||
maxConsumerPositionPerPartition.put(tp, consumer.position(tp));
|
||||
if (consumer.position(tp) >= readEndOffsets.get(tp)) {
|
||||
consumer.pause(Collections.singletonList(tp));
|
||||
}
|
||||
}
|
||||
|
||||
allRecordsReceived = consumer.paused().size() == readEndOffsets.keySet().size();
|
||||
}
|
||||
|
||||
if (!allRecordsReceived) {
|
||||
System.err.println("Pause partitions (ie, received all data): " + consumer.paused());
|
||||
System.err.println("Max received offset per partition: " + maxReceivedOffsetPerPartition);
|
||||
System.err.println("Max consumer position per partition: " + maxConsumerPositionPerPartition);
|
||||
throw new RuntimeException("FAIL: did not receive all records after " + (MAX_IDLE_TIME_MS / 1000L) + " sec idle time.");
|
||||
}
|
||||
|
||||
return recordPerTopicPerPartition;
|
||||
}
|
||||
|
||||
private static void addRecord(final ConsumerRecord<byte[], byte[]> record,
|
||||
final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
|
||||
final boolean withRepartitioning) {
|
||||
|
||||
final String topic = record.topic();
|
||||
final TopicPartition partition = new TopicPartition(topic, record.partition());
|
||||
|
||||
if (verifyTopic(topic, withRepartitioning)) {
|
||||
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> topicRecordsPerPartition =
|
||||
recordPerTopicPerPartition.computeIfAbsent(topic, k -> new HashMap<>());
|
||||
|
||||
final List<ConsumerRecord<byte[], byte[]>> records =
|
||||
topicRecordsPerPartition.computeIfAbsent(partition, k -> new ArrayList<>());
|
||||
|
||||
records.add(record);
|
||||
} else {
|
||||
throw new RuntimeException("FAIL: received data from unexpected topic: " + record);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean verifyTopic(final String topic,
|
||||
final boolean withRepartitioning) {
|
||||
final boolean validTopic = "data".equals(topic) || "echo".equals(topic) || "min".equals(topic) || "sum".equals(topic);
|
||||
|
||||
if (withRepartitioning) {
|
||||
return validTopic || "repartition".equals(topic) || "max".equals(topic) || "cnt".equals(topic);
|
||||
}
|
||||
|
||||
return validTopic;
|
||||
}
|
||||
|
||||
private static void verifyReceivedAllRecords(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> expectedRecords,
|
||||
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> receivedRecords) {
|
||||
if (expectedRecords.size() != receivedRecords.size()) {
|
||||
throw new RuntimeException("Result verification failed. Received " + receivedRecords.size() + " records but expected " + expectedRecords.size());
|
||||
}
|
||||
|
||||
final StringDeserializer stringDeserializer = new StringDeserializer();
|
||||
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
|
||||
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : receivedRecords.entrySet()) {
|
||||
final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
|
||||
final List<ConsumerRecord<byte[], byte[]>> receivedRecordsForPartition = partitionRecords.getValue();
|
||||
final List<ConsumerRecord<byte[], byte[]>> expectedRecordsForPartition = expectedRecords.get(inputTopicPartition);
|
||||
|
||||
System.out.println(partitionRecords.getKey() + " with " + receivedRecordsForPartition.size() + ", " +
|
||||
inputTopicPartition + " with " + expectedRecordsForPartition.size());
|
||||
|
||||
final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = expectedRecordsForPartition.iterator();
|
||||
RuntimeException exception = null;
|
||||
for (final ConsumerRecord<byte[], byte[]> receivedRecord : receivedRecordsForPartition) {
|
||||
if (!expectedRecord.hasNext()) {
|
||||
exception = new RuntimeException("Result verification failed for " + receivedRecord + " since there's no more expected record");
|
||||
}
|
||||
|
||||
final ConsumerRecord<byte[], byte[]> expected = expectedRecord.next();
|
||||
|
||||
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
|
||||
final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
|
||||
final String expectedKey = stringDeserializer.deserialize(expected.topic(), expected.key());
|
||||
final int expectedValue = integerDeserializer.deserialize(expected.topic(), expected.value());
|
||||
|
||||
if (!receivedKey.equals(expectedKey) || receivedValue != expectedValue) {
|
||||
exception = new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + expectedKey + "," + expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
|
||||
}
|
||||
}
|
||||
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void verifyMin(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
|
||||
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
|
||||
final StringDeserializer stringDeserializer = new StringDeserializer();
|
||||
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
|
||||
|
||||
final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
|
||||
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
|
||||
final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
|
||||
final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
|
||||
final List<ConsumerRecord<byte[], byte[]>> partitionMin = partitionRecords.getValue();
|
||||
|
||||
if (partitionInput.size() != partitionMin.size()) {
|
||||
throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
|
||||
+ partitionRecords.getKey() + " but received " + partitionMin.size());
|
||||
}
|
||||
|
||||
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
|
||||
|
||||
for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionMin) {
|
||||
final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
|
||||
|
||||
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
|
||||
final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
|
||||
final String key = stringDeserializer.deserialize(input.topic(), input.key());
|
||||
final int value = integerDeserializer.deserialize(input.topic(), input.value());
|
||||
|
||||
Integer min = currentMinPerKey.get(key);
|
||||
if (min == null) {
|
||||
min = value;
|
||||
} else {
|
||||
min = Math.min(min, value);
|
||||
}
|
||||
currentMinPerKey.put(key, min);
|
||||
|
||||
if (!receivedKey.equals(key) || receivedValue != min) {
|
||||
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + min + "> but was <" + receivedKey + "," + receivedValue + ">");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void verifySum(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
|
||||
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
|
||||
final StringDeserializer stringDeserializer = new StringDeserializer();
|
||||
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
|
||||
final LongDeserializer longDeserializer = new LongDeserializer();
|
||||
|
||||
final HashMap<String, Long> currentSumPerKey = new HashMap<>();
|
||||
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
|
||||
final TopicPartition inputTopicPartition = new TopicPartition("data", partitionRecords.getKey().partition());
|
||||
final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
|
||||
final List<ConsumerRecord<byte[], byte[]>> partitionSum = partitionRecords.getValue();
|
||||
|
||||
if (partitionInput.size() != partitionSum.size()) {
|
||||
throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
|
||||
+ partitionRecords.getKey() + " but received " + partitionSum.size());
|
||||
}
|
||||
|
||||
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
|
||||
|
||||
for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionSum) {
|
||||
final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
|
||||
|
||||
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
|
||||
final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
|
||||
final String key = stringDeserializer.deserialize(input.topic(), input.key());
|
||||
final int value = integerDeserializer.deserialize(input.topic(), input.value());
|
||||
|
||||
Long sum = currentSumPerKey.get(key);
|
||||
if (sum == null) {
|
||||
sum = (long) value;
|
||||
} else {
|
||||
sum += value;
|
||||
}
|
||||
currentSumPerKey.put(key, sum);
|
||||
|
||||
if (!receivedKey.equals(key) || receivedValue != sum) {
|
||||
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + sum + "> but was <" + receivedKey + "," + receivedValue + ">");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void verifyMax(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
|
||||
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> maxPerTopicPerPartition) {
|
||||
final StringDeserializer stringDeserializer = new StringDeserializer();
|
||||
final IntegerDeserializer integerDeserializer = new IntegerDeserializer();
|
||||
|
||||
final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
|
||||
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : maxPerTopicPerPartition.entrySet()) {
|
||||
final TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
|
||||
final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
|
||||
final List<ConsumerRecord<byte[], byte[]>> partitionMax = partitionRecords.getValue();
|
||||
|
||||
if (partitionInput.size() != partitionMax.size()) {
|
||||
throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
|
||||
+ partitionRecords.getKey() + " but received " + partitionMax.size());
|
||||
}
|
||||
|
||||
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
|
||||
|
||||
for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionMax) {
|
||||
final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
|
||||
|
||||
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
|
||||
final int receivedValue = integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
|
||||
final String key = stringDeserializer.deserialize(input.topic(), input.key());
|
||||
final int value = integerDeserializer.deserialize(input.topic(), input.value());
|
||||
|
||||
|
||||
Integer max = currentMinPerKey.get(key);
|
||||
if (max == null) {
|
||||
max = Integer.MIN_VALUE;
|
||||
}
|
||||
max = Math.max(max, value);
|
||||
currentMinPerKey.put(key, max);
|
||||
|
||||
if (!receivedKey.equals(key) || receivedValue != max) {
|
||||
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + max + "> but was <" + receivedKey + "," + receivedValue + ">");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void verifyCnt(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
|
||||
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> cntPerTopicPerPartition) {
|
||||
final StringDeserializer stringDeserializer = new StringDeserializer();
|
||||
final LongDeserializer longDeserializer = new LongDeserializer();
|
||||
|
||||
final HashMap<String, Long> currentSumPerKey = new HashMap<>();
|
||||
for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords : cntPerTopicPerPartition.entrySet()) {
|
||||
final TopicPartition inputTopicPartition = new TopicPartition("repartition", partitionRecords.getKey().partition());
|
||||
final List<ConsumerRecord<byte[], byte[]>> partitionInput = inputPerTopicPerPartition.get(inputTopicPartition);
|
||||
final List<ConsumerRecord<byte[], byte[]>> partitionCnt = partitionRecords.getValue();
|
||||
|
||||
if (partitionInput.size() != partitionCnt.size()) {
|
||||
throw new RuntimeException("Result verification failed: expected " + partitionInput.size() + " records for "
|
||||
+ partitionRecords.getKey() + " but received " + partitionCnt.size());
|
||||
}
|
||||
|
||||
final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = partitionInput.iterator();
|
||||
|
||||
for (final ConsumerRecord<byte[], byte[]> receivedRecord : partitionCnt) {
|
||||
final ConsumerRecord<byte[], byte[]> input = inputRecords.next();
|
||||
|
||||
final String receivedKey = stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
|
||||
final long receivedValue = longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
|
||||
final String key = stringDeserializer.deserialize(input.topic(), input.key());
|
||||
|
||||
Long cnt = currentSumPerKey.get(key);
|
||||
if (cnt == null) {
|
||||
cnt = 0L;
|
||||
}
|
||||
currentSumPerKey.put(key, ++cnt);
|
||||
|
||||
if (!receivedKey.equals(key) || receivedValue != cnt) {
|
||||
throw new RuntimeException("Result verification failed for " + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + receivedKey + "," + receivedValue + ">");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void verifyAllTransactionFinished(final KafkaConsumer<byte[], byte[]> consumer,
|
||||
final String kafka,
|
||||
final boolean withRepartitioning) {
|
||||
final String[] topics;
|
||||
if (withRepartitioning) {
|
||||
topics = new String[] {"echo", "min", "sum", "repartition", "max", "cnt"};
|
||||
} else {
|
||||
topics = new String[] {"echo", "min", "sum"};
|
||||
}
|
||||
|
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, topics);
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToEnd(partitions);
|
||||
for (final TopicPartition tp : partitions) {
|
||||
System.out.println(tp + " at position " + consumer.position(tp));
|
||||
}
|
||||
|
||||
final Properties consumerProps = new Properties();
|
||||
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-uncommitted");
|
||||
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
|
||||
|
||||
final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
|
||||
try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new KafkaConsumer<>(consumerProps)) {
|
||||
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
|
||||
consumer.seekToEnd(partitions);
|
||||
final Map<TopicPartition, Long> topicEndOffsets = consumerUncommitted.endOffsets(partitions);
|
||||
|
||||
final Iterator<TopicPartition> iterator = partitions.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
final TopicPartition topicPartition = iterator.next();
|
||||
final long position = consumer.position(topicPartition);
|
||||
|
||||
if (position == topicEndOffsets.get(topicPartition)) {
|
||||
iterator.remove();
|
||||
System.out.println("Removing " + topicPartition + " at position " + position);
|
||||
} else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) {
|
||||
throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition));
|
||||
} else {
|
||||
System.out.println("Retry " + topicPartition + " at position " + position);
|
||||
}
|
||||
}
|
||||
sleep(1000L);
|
||||
}
|
||||
}
|
||||
|
||||
if (!partitions.isEmpty()) {
|
||||
throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec.");
|
||||
}
|
||||
}
|
||||
|
||||
private static List<TopicPartition> getAllPartitions(final KafkaConsumer<?, ?> consumer,
|
||||
final String... topics) {
|
||||
final ArrayList<TopicPartition> partitions = new ArrayList<>();
|
||||
|
||||
for (final String topic : topics) {
|
||||
for (final PartitionInfo info : consumer.partitionsFor(topic)) {
|
||||
partitions.add(new TopicPartition(info.topic(), info.partition()));
|
||||
}
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
|
||||
private static ConsumerGroupDescription getConsumerGroupDescription(final Admin adminClient) {
|
||||
final ConsumerGroupDescription description;
|
||||
try {
|
||||
description = adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID))
|
||||
.describedGroups()
|
||||
.get(EosTestClient.APP_ID)
|
||||
.get(10, TimeUnit.SECONDS);
|
||||
} catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException("Unexpected Exception getting group description", e);
|
||||
}
|
||||
return description;
|
||||
}
|
||||
|
||||
private static StreamsGroupDescription getStreamsGroupDescription(final Admin adminClient) {
|
||||
final StreamsGroupDescription description;
|
||||
try {
|
||||
description = adminClient.describeStreamsGroups(Collections.singleton(EosTestClient.APP_ID))
|
||||
.describedGroups()
|
||||
.get(EosTestClient.APP_ID)
|
||||
.get(10, TimeUnit.SECONDS);
|
||||
} catch (final InterruptedException | ExecutionException | java.util.concurrent.TimeoutException e) {
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException("Unexpected Exception getting group description", e);
|
||||
}
|
||||
return description;
|
||||
}
|
||||
}
|
|
@ -381,25 +381,25 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
final int maxRecordsPerKey,
|
||||
final boolean eosEnabled) {
|
||||
final Properties props = createConsumerProperties(kafka);
|
||||
try (final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props)) {
|
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToBeginning(partitions);
|
||||
final KafkaConsumer<String, Number> consumer = new KafkaConsumer<>(props);
|
||||
final List<TopicPartition> partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
|
||||
consumer.assign(partitions);
|
||||
consumer.seekToBeginning(partitions);
|
||||
|
||||
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
|
||||
final RecordProcessingState state = new RecordProcessingState(recordsGenerated);
|
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
|
||||
final int recordsGenerated = inputs.size() * maxRecordsPerKey;
|
||||
final RecordProcessingState state = new RecordProcessingState(recordsGenerated);
|
||||
final Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<>();
|
||||
|
||||
final long start = System.currentTimeMillis();
|
||||
final VerificationResult verificationResult = consumeAndProcessRecords(consumer, inputs, events, state, start, eosEnabled);
|
||||
final long start = System.currentTimeMillis();
|
||||
final VerificationResult verificationResult = consumeAndProcessRecords(consumer, inputs, events, state, start, eosEnabled);
|
||||
consumer.close();
|
||||
|
||||
final VerificationResult eosResult = performEosVerification(eosEnabled, kafka);
|
||||
if (!eosResult.passed()) {
|
||||
return eosResult;
|
||||
}
|
||||
|
||||
return validateAndReportResults(inputs, events, state, verificationResult, start, eosEnabled);
|
||||
final VerificationResult eosResult = performEosVerification(eosEnabled, kafka);
|
||||
if (!eosResult.passed()) {
|
||||
return eosResult;
|
||||
}
|
||||
|
||||
return validateAndReportResults(inputs, events, state, verificationResult, start, eosEnabled);
|
||||
}
|
||||
|
||||
private static Properties createConsumerProperties(final String kafka) {
|
||||
|
|
|
@ -1,91 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.tests;
|
||||
|
||||
import org.apache.kafka.common.utils.Exit;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
public class StreamsEosTest {
|
||||
|
||||
/**
|
||||
* args ::= kafka propFileName command
|
||||
* command := "run" | "process" | "verify"
|
||||
*/
|
||||
public static void main(final String[] args) throws IOException {
|
||||
if (args.length < 2) {
|
||||
System.err.println("StreamsEosTest are expecting two parameters: propFile, command; but only see " + args.length + " parameter");
|
||||
Exit.exit(1);
|
||||
}
|
||||
|
||||
final String propFileName = args[0];
|
||||
final String command = args[1];
|
||||
|
||||
final Properties streamsProperties = Utils.loadProps(propFileName);
|
||||
final String kafka = streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
|
||||
final String processingGuarantee = streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
|
||||
|
||||
if (kafka == null) {
|
||||
System.err.println("No bootstrap kafka servers specified in " + StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
|
||||
Exit.exit(1);
|
||||
}
|
||||
|
||||
if ("process".equals(command) || "process-complex".equals(command)) {
|
||||
if (!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
|
||||
|
||||
System.err.println("processingGuarantee must be " + StreamsConfig.EXACTLY_ONCE_V2);
|
||||
Exit.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("StreamsTest instance started");
|
||||
System.out.println("kafka=" + kafka);
|
||||
System.out.println("props=" + streamsProperties);
|
||||
System.out.println("command=" + command);
|
||||
System.out.flush();
|
||||
|
||||
if (command == null || propFileName == null) {
|
||||
Exit.exit(-1);
|
||||
}
|
||||
|
||||
switch (command) {
|
||||
case "run":
|
||||
EosTestDriver.generate(kafka);
|
||||
break;
|
||||
case "process":
|
||||
new EosTestClient(streamsProperties, false).start();
|
||||
break;
|
||||
case "process-complex":
|
||||
new EosTestClient(streamsProperties, true).start();
|
||||
break;
|
||||
case "verify":
|
||||
EosTestDriver.verify(kafka, false, streamsProperties.getProperty("group.protocol"));
|
||||
break;
|
||||
case "verify-complex":
|
||||
EosTestDriver.verify(kafka, true, streamsProperties.getProperty("group.protocol"));
|
||||
break;
|
||||
default:
|
||||
System.out.println("unknown command: " + command);
|
||||
System.out.flush();
|
||||
Exit.exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -443,27 +443,6 @@ class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
|
|||
def __init__(self, test_context, kafka, processing_guarantee, group_protocol = 'classic', num_threads = 3, replication_factor = 3):
|
||||
super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, kafka, "process", processing_guarantee, group_protocol, num_threads, replication_factor)
|
||||
|
||||
class StreamsEosTestDriverService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
super(StreamsEosTestDriverService, self).__init__(test_context, kafka, "run", "classic")
|
||||
|
||||
class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka, group_protocol):
|
||||
super(StreamsEosTestJobRunnerService, self).__init__(test_context, kafka, "process", group_protocol)
|
||||
|
||||
class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka, group_protocol):
|
||||
super(StreamsComplexEosTestJobRunnerService, self).__init__(test_context, kafka, "process-complex", group_protocol)
|
||||
|
||||
class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka, group_protocol):
|
||||
super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify", group_protocol)
|
||||
|
||||
|
||||
class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
|
||||
def __init__(self, test_context, kafka, group_protocol):
|
||||
super(StreamsComplexEosTestVerifyRunnerService, self).__init__(test_context, kafka, "verify-complex", group_protocol)
|
||||
|
||||
|
||||
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
|
||||
def __init__(self, test_context, kafka):
|
||||
|
|
|
@ -1,183 +0,0 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from ducktape.mark import matrix
|
||||
from ducktape.mark.resource import cluster
|
||||
from kafkatest.services.kafka import quorum
|
||||
from kafkatest.services.streams import StreamsEosTestDriverService, StreamsEosTestJobRunnerService, \
|
||||
StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, StreamsComplexEosTestVerifyRunnerService
|
||||
from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
|
||||
|
||||
class StreamsEosTest(BaseStreamsTest):
|
||||
"""
|
||||
Test of Kafka Streams exactly-once semantics
|
||||
"""
|
||||
|
||||
def __init__(self, test_context):
|
||||
super(StreamsEosTest, self).__init__(test_context, num_controllers=1, num_brokers=3, topics={
|
||||
'data': {'partitions': 5, 'replication-factor': 2},
|
||||
'echo': {'partitions': 5, 'replication-factor': 2},
|
||||
'min': {'partitions': 5, 'replication-factor': 2},
|
||||
'sum': {'partitions': 5, 'replication-factor': 2},
|
||||
'repartition': {'partitions': 5, 'replication-factor': 2},
|
||||
'max': {'partitions': 5, 'replication-factor': 2},
|
||||
'cnt': {'partitions': 5, 'replication-factor': 2}
|
||||
})
|
||||
self.driver = StreamsEosTestDriverService(test_context, self.kafka)
|
||||
self.test_context = test_context
|
||||
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_rebalance_simple(self, metadata_quorum, group_protocol):
|
||||
self.group_protocol = group_protocol
|
||||
self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_rebalance_complex(self, metadata_quorum, group_protocol):
|
||||
self.group_protocol = group_protocol
|
||||
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
|
||||
|
||||
def run_rebalance(self, processor1, processor2, processor3, verifier):
|
||||
"""
|
||||
Starts and stops two test clients a few times.
|
||||
Ensure that all records are delivered exactly-once.
|
||||
"""
|
||||
|
||||
self.driver.start()
|
||||
|
||||
self.add_streams(processor1)
|
||||
processor1.clean_node_enabled = False
|
||||
self.add_streams2(processor1, processor2)
|
||||
self.add_streams3(processor1, processor2, processor3)
|
||||
self.stop_streams3(processor2, processor3, processor1)
|
||||
self.add_streams3(processor2, processor3, processor1)
|
||||
self.stop_streams3(processor1, processor3, processor2)
|
||||
self.stop_streams2(processor1, processor3)
|
||||
self.stop_streams(processor1)
|
||||
processor1.clean_node_enabled = True
|
||||
|
||||
self.driver.stop()
|
||||
|
||||
verifier.start()
|
||||
verifier.wait()
|
||||
|
||||
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
|
||||
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_failure_and_recovery(self, metadata_quorum, group_protocol):
|
||||
self.group_protocol = group_protocol
|
||||
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
|
||||
@cluster(num_nodes=8)
|
||||
@matrix(metadata_quorum=[quorum.combined_kraft],
|
||||
group_protocol=["classic", "streams"])
|
||||
def test_failure_and_recovery_complex(self, metadata_quorum, group_protocol):
|
||||
self.group_protocol = group_protocol
|
||||
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
|
||||
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, group_protocol))
|
||||
|
||||
def run_failure_and_recovery(self, processor1, processor2, processor3, verifier):
|
||||
"""
|
||||
Starts two test clients, then abort (kill -9) and restart them a few times.
|
||||
Ensure that all records are delivered exactly-once.
|
||||
"""
|
||||
|
||||
self.driver.start()
|
||||
|
||||
self.add_streams(processor1)
|
||||
processor1.clean_node_enabled = False
|
||||
self.add_streams2(processor1, processor2)
|
||||
self.add_streams3(processor1, processor2, processor3)
|
||||
self.abort_streams(processor2, processor3, processor1)
|
||||
self.add_streams3(processor2, processor3, processor1)
|
||||
self.abort_streams(processor2, processor3, processor1)
|
||||
self.add_streams3(processor2, processor3, processor1)
|
||||
self.abort_streams(processor1, processor3, processor2)
|
||||
self.stop_streams2(processor1, processor3)
|
||||
self.stop_streams(processor1)
|
||||
processor1.clean_node_enabled = True
|
||||
|
||||
self.driver.stop()
|
||||
|
||||
verifier.start()
|
||||
verifier.wait()
|
||||
|
||||
verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
|
||||
|
||||
def add_streams(self, processor):
|
||||
with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
|
||||
processor.start()
|
||||
self.wait_for_startup(monitor, processor)
|
||||
|
||||
def add_streams2(self, running_processor, processor_to_be_started):
|
||||
with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor:
|
||||
self.add_streams(processor_to_be_started)
|
||||
self.wait_for_startup(monitor, running_processor)
|
||||
|
||||
def add_streams3(self, running_processor1, running_processor2, processor_to_be_started):
|
||||
with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor:
|
||||
self.add_streams2(running_processor2, processor_to_be_started)
|
||||
self.wait_for_startup(monitor, running_processor1)
|
||||
|
||||
def stop_streams(self, processor_to_be_stopped):
|
||||
with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2:
|
||||
processor_to_be_stopped.stop()
|
||||
self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING")
|
||||
|
||||
def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
|
||||
with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor:
|
||||
self.stop_streams(processor_to_be_stopped)
|
||||
self.wait_for_startup(monitor, keep_alive_processor)
|
||||
|
||||
def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped):
|
||||
with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor:
|
||||
self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
|
||||
self.wait_for_startup(monitor, keep_alive_processor1)
|
||||
|
||||
def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted):
|
||||
with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1:
|
||||
with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2:
|
||||
processor_to_be_aborted.stop_nodes(False)
|
||||
self.wait_for_startup(monitor2, keep_alive_processor2)
|
||||
self.wait_for_startup(monitor1, keep_alive_processor1)
|
||||
|
||||
def wait_for_startup(self, monitor, processor):
|
||||
if self.group_protocol == "classic":
|
||||
self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
|
||||
else:
|
||||
# In the streams group protocol, not all members will take part in the rebalance.
|
||||
# We can indirectly observe the progress of the group by seeing the member epoch being bumped.
|
||||
self.wait_for(monitor, processor, "MemberEpochBump")
|
||||
self.wait_for(monitor, processor, "processed [0-9]* records from topic")
|
||||
|
||||
@staticmethod
|
||||
def wait_for(monitor, processor, output):
|
||||
monitor.wait_until(output,
|
||||
timeout_sec=480,
|
||||
err_msg=("Never saw output '%s' on " % output) + str(processor.node.account))
|
Loading…
Reference in New Issue