KAFKA-15834: Remove more leaky NamedTopologyIntegrationTest tests (#15243)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
This commit is contained in:
Greg Harris 2024-02-05 11:39:47 -08:00 committed by GitHub
parent 24a79c2613
commit 5f35b41e92
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 0 additions and 157 deletions

View File

@ -18,8 +18,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
@ -36,7 +34,6 @@ import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
@ -58,7 +55,6 @@ import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils; import org.apache.kafka.test.TestUtils;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
@ -81,7 +77,6 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -92,17 +87,13 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DE
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.processor.internals.ClientUtils.extractThreadId;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@Timeout(600) @Timeout(600)
@Tag("integration") @Tag("integration")
@ -550,31 +541,6 @@ public class NamedTopologyIntegrationTest {
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
} }
@Test
public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() throws Exception {
CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
CLUSTER.createTopic(DELAYED_INPUT_STREAM_2, 2, 1);
try {
topology1Builder.stream(DELAYED_INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
topology2Builder.stream(DELAYED_INPUT_STREAM_2).map((k, v) -> {
throw new IllegalStateException("Should not process any records for removed topology-2");
});
streams.addNamedTopology(topology1Builder.build());
streams.addNamedTopology(topology2Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
streams.removeNamedTopology("topology-2").all().get();
produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA);
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
}
}
@Test @Test
public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throws Exception { public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throws Exception {
CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT); CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
@ -716,129 +682,6 @@ public class NamedTopologyIntegrationTest {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT); CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
} }
@Test
public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception {
setupSecondKafkaStreams();
topology1Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
topology1Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
final TrackingExceptionHandler handler = new TrackingExceptionHandler();
streams.setUncaughtExceptionHandler(handler);
streams2.setUncaughtExceptionHandler(handler);
streams.addNamedTopology(topology1Builder.build());
streams2.addNamedTopology(topology1Builder2.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2));
retryOnExceptionWithTimeout(() -> {
final Throwable error = handler.nextError(TOPOLOGY_1);
assertThat(error, notNullValue());
assertThat(error.getCause().getClass(), is(MissingSourceTopicException.class));
});
try {
CLUSTER.createTopic(NEW_STREAM, 2, 1);
produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
final List<KeyValue<String, Integer>> output =
waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5);
output.retainAll(COUNT_OUTPUT_DATA);
assertThat(output, equalTo(COUNT_OUTPUT_DATA));
// Make sure the threads were not actually killed and replaced
assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
assertThat(streams2.metadataForLocalThreads().size(), equalTo(2));
final Set<String> localThreadsNames = streams.metadataForLocalThreads().stream()
.map(t -> extractThreadId(t.threadName()))
.collect(Collectors.toSet());
final Set<String> localThreadsNames2 = streams2.metadataForLocalThreads().stream()
.map(t -> extractThreadId(t.threadName()))
.collect(Collectors.toSet());
assertThat(localThreadsNames.contains("StreamThread-1"), is(true));
assertThat(localThreadsNames.contains("StreamThread-2"), is(true));
assertThat(localThreadsNames2.contains("StreamThread-1"), is(true));
assertThat(localThreadsNames2.contains("StreamThread-2"), is(true));
} finally {
CLUSTER.deleteTopicsAndWait(NEW_STREAM);
}
}
@Test
public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {
CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
CLUSTER.createTopic(DELAYED_INPUT_STREAM_2, 2, 1);
try {
final AtomicInteger noOutputExpected = new AtomicInteger(0);
final AtomicInteger outputExpected = new AtomicInteger(0);
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 15000L);
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
// Discard the pre-created streams and replace with test-specific topology
streams.close();
streams = new KafkaStreamsNamedTopologyWrapper(props);
streams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
final NamedTopologyBuilder builder = streams.newNamedTopologyBuilder("topology_A");
builder.stream(DELAYED_INPUT_STREAM_1).peek((k, v) -> outputExpected.incrementAndGet()).to(OUTPUT_STREAM_1);
builder.stream(DELAYED_INPUT_STREAM_2)
.peek((k, v) -> {
throw new RuntimeException("Kaboom");
})
.peek((k, v) -> noOutputExpected.incrementAndGet())
.to(OUTPUT_STREAM_2);
streams.addNamedTopology(builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
DELAYED_INPUT_STREAM_2,
singletonList(
new KeyValue<>(1, "A")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
DELAYED_INPUT_STREAM_1,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
IntegerDeserializer.class,
StringDeserializer.class
),
OUTPUT_STREAM_1,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
)
);
assertThat(noOutputExpected.get(), equalTo(0));
assertThat(outputExpected.get(), equalTo(2));
} finally {
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
}
}
/** /**
* Validates that each metadata object has only partitions & state stores for its specific topology name and * Validates that each metadata object has only partitions & state stores for its specific topology name and
* asserts that {@code left} and {@code right} differ only by {@link StreamsMetadata#hostInfo()} * asserts that {@code left} and {@code right} differ only by {@link StreamsMetadata#hostInfo()}