From 055ff2b831193f5935f9efc2f7809f853f63de5f Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 16 Jan 2024 10:20:39 -0800 Subject: [PATCH] KAFKA-15834: Remove NamedTopologyIntegrationTest case which leaks clients (#15185) Signed-off-by: Greg Harris Reviewers: Anna Sophie Blee-Goldman , Matthias J. Sax --- .../NamedTopologyIntegrationTest.java | 64 ------------------- 1 file changed, 64 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 1dede6617c5..e1d12352689 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -99,7 +99,6 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static java.util.Arrays.asList; import static java.util.Collections.singleton; @@ -717,69 +716,6 @@ public class NamedTopologyIntegrationTest { CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT); } - @Test - public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception { - // This test leaks sockets due to KAFKA-15834 - try { - CLUSTER.createTopic(EXISTING_STREAM, 2, 1); - produceToInputTopics(EXISTING_STREAM, STANDARD_INPUT_DATA); - setupSecondKafkaStreams(); - topology1Builder.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1); - topology1Builder2.stream(EXISTING_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1); - - final TrackingExceptionHandler handler = new TrackingExceptionHandler(); - streams.setUncaughtExceptionHandler(handler); - streams2.setUncaughtExceptionHandler(handler); - - streams.addNamedTopology(topology1Builder.build()); - streams2.addNamedTopology(topology1Builder2.build()); - - IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2)); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA)); - - topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2); - topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2); - - assertThat(handler.nextError(TOPOLOGY_2), nullValue()); - - streams.addNamedTopology(topology2Builder.build()); - streams2.addNamedTopology(topology2Builder2.build()); - - // verify that the missing source topics were noticed and the handler invoked - retryOnExceptionWithTimeout(() -> { - final Throwable error = handler.nextError(TOPOLOGY_2); - assertThat(error, notNullValue()); - assertThat(error.getCause().getClass(), is(MissingSourceTopicException.class)); - }); - - // make sure the original topology can continue processing while waiting on the new source topics - produceToInputTopics(EXISTING_STREAM, singletonList(pair("A", 30L))); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 1), equalTo(singletonList(pair("A", 3L)))); - - CLUSTER.createTopic(NEW_STREAM, 2, 1); - produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA); - assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA)); - - // Make sure the threads were not actually killed and replaced - assertThat(streams.metadataForLocalThreads().size(), equalTo(2)); - assertThat(streams2.metadataForLocalThreads().size(), equalTo(2)); - - final Set localThreadsNames = streams.metadataForLocalThreads().stream() - .map(t -> extractThreadId(t.threadName())) - .collect(Collectors.toSet()); - final Set localThreadsNames2 = streams2.metadataForLocalThreads().stream() - .map(t -> extractThreadId(t.threadName())) - .collect(Collectors.toSet()); - - assertThat(localThreadsNames.contains("StreamThread-1"), is(true)); - assertThat(localThreadsNames.contains("StreamThread-2"), is(true)); - assertThat(localThreadsNames2.contains("StreamThread-1"), is(true)); - assertThat(localThreadsNames2.contains("StreamThread-2"), is(true)); - } finally { - CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM); - } - } - @Test public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception { setupSecondKafkaStreams();