mirror of https://github.com/apache/kafka.git
KAFKA-15834: Remove NamedTopologyIntegrationTest case which leaks clients (#15185)
Signed-off-by: Greg Harris <greg.harris@aiven.io> Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
dd0916ef9a
commit
055ff2b831
|
@ -99,7 +99,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
|
|||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.CoreMatchers.notNullValue;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singleton;
|
||||
|
@ -717,69 +716,6 @@ public class NamedTopologyIntegrationTest {
|
|||
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception {
|
||||
// This test leaks sockets due to KAFKA-15834
|
||||
try {
|
||||
CLUSTER.createTopic(EXISTING_STREAM, 2, 1);
|
||||
produceToInputTopics(EXISTING_STREAM, STANDARD_INPUT_DATA);
|
||||
setupSecondKafkaStreams();
|
||||
topology1Builder.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
|
||||
topology1Builder2.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
|
||||
|
||||
final TrackingExceptionHandler handler = new TrackingExceptionHandler();
|
||||
streams.setUncaughtExceptionHandler(handler);
|
||||
streams2.setUncaughtExceptionHandler(handler);
|
||||
|
||||
streams.addNamedTopology(topology1Builder.build());
|
||||
streams2.addNamedTopology(topology1Builder2.build());
|
||||
|
||||
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2));
|
||||
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
|
||||
|
||||
topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
|
||||
topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
|
||||
|
||||
assertThat(handler.nextError(TOPOLOGY_2), nullValue());
|
||||
|
||||
streams.addNamedTopology(topology2Builder.build());
|
||||
streams2.addNamedTopology(topology2Builder2.build());
|
||||
|
||||
// verify that the missing source topics were noticed and the handler invoked
|
||||
retryOnExceptionWithTimeout(() -> {
|
||||
final Throwable error = handler.nextError(TOPOLOGY_2);
|
||||
assertThat(error, notNullValue());
|
||||
assertThat(error.getCause().getClass(), is(MissingSourceTopicException.class));
|
||||
});
|
||||
|
||||
// make sure the original topology can continue processing while waiting on the new source topics
|
||||
produceToInputTopics(EXISTING_STREAM, singletonList(pair("A", 30L)));
|
||||
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 1), equalTo(singletonList(pair("A", 3L))));
|
||||
|
||||
CLUSTER.createTopic(NEW_STREAM, 2, 1);
|
||||
produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
|
||||
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
|
||||
|
||||
// Make sure the threads were not actually killed and replaced
|
||||
assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
|
||||
assertThat(streams2.metadataForLocalThreads().size(), equalTo(2));
|
||||
|
||||
final Set<String> localThreadsNames = streams.metadataForLocalThreads().stream()
|
||||
.map(t -> extractThreadId(t.threadName()))
|
||||
.collect(Collectors.toSet());
|
||||
final Set<String> localThreadsNames2 = streams2.metadataForLocalThreads().stream()
|
||||
.map(t -> extractThreadId(t.threadName()))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
assertThat(localThreadsNames.contains("StreamThread-1"), is(true));
|
||||
assertThat(localThreadsNames.contains("StreamThread-2"), is(true));
|
||||
assertThat(localThreadsNames2.contains("StreamThread-1"), is(true));
|
||||
assertThat(localThreadsNames2.contains("StreamThread-2"), is(true));
|
||||
} finally {
|
||||
CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception {
|
||||
setupSecondKafkaStreams();
|
||||
|
|
Loading…
Reference in New Issue