diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java index d9dcfbdc486..be9b058b71d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java @@ -48,6 +48,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -87,7 +88,7 @@ public class ConsistencyVectorIntegrationTest { @AfterEach public void after() { for (final KafkaStreams kafkaStreams : streamsToCleanup) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } cluster.stop(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 0c0fbc69a50..8f1be1b4f5a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -56,6 +56,7 @@ import org.junit.jupiter.api.Timeout; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -136,7 +137,7 @@ public class GlobalKTableEOSIntegrationTest { @AfterEach public void after() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 3cbce8d6c31..145ebf34d41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -122,7 +122,7 @@ public class GlobalKTableIntegrationTest { @AfterEach public void whenShuttingDown() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java index b610b39240d..3869de0b136 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalStateReprocessTest.java @@ -51,6 +51,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -137,7 +138,7 @@ public class GlobalStateReprocessTest { @AfterEach public void after() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java index 7432eb67e01..d466a22130c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java @@ -157,7 +157,7 @@ public class GlobalThreadShutDownOrderTest { @AfterEach public void after() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index bbb46d6bbe5..76ffaeebeea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -167,7 +167,7 @@ public class IQv2IntegrationTest { @AfterEach public void afterTest() { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); kafkaStreams.cleanUp(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java index be31768aa4a..69f70e2522a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java @@ -754,7 +754,7 @@ public class IQv2StoreIntegrationTest { public void afterTest() { // only needed because some of the PAPI cases aren't added yet. if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); kafkaStreams.cleanUp(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java index 7e88fd5e808..26a5aed5a52 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java @@ -122,7 +122,7 @@ public class IQv2VersionedStoreIntegrationTest { @AfterEach public void afterTest() { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); kafkaStreams.cleanUp(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index bb158837b8b..158a7c5a4a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -53,6 +53,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -117,7 +118,7 @@ public class KStreamAggregationDedupIntegrationTest { @AfterEach public void whenShuttingDown() throws IOException { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index abe5349e034..9e2e784c967 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -155,7 +155,7 @@ public class KStreamAggregationIntegrationTest { @AfterEach public void whenShuttingDown() throws Exception { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java index 3f55d7bb2ca..6aa70b77679 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java @@ -107,7 +107,7 @@ public class KStreamKStreamIntegrationTest { @AfterEach public void after() throws IOException { if (streams != null) { - streams.close(); + streams.close(Duration.ofSeconds(60)); streams = null; } IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index a4a6d0d4424..74e888e5c32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -138,7 +138,7 @@ public class KStreamRepartitionIntegrationTest { public void whenShuttingDown() throws IOException { kafkaStreamsInstances.stream() .filter(Objects::nonNull) - .forEach(KafkaStreams::close); + .forEach(ks -> ks.close(Duration.ofSeconds(60))); Utils.delete(testFolder); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java index 8afff79cc89..21f11847be9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java @@ -54,6 +54,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -158,15 +159,15 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { @AfterEach public void after() throws IOException { if (streams != null) { - streams.close(); + streams.close(Duration.ofSeconds(60)); streams = null; } if (streamsTwo != null) { - streamsTwo.close(); + streamsTwo.close(Duration.ofSeconds(60)); streamsTwo = null; } if (streamsThree != null) { - streamsThree.close(); + streamsThree.close(Duration.ofSeconds(60)); streamsThree = null; } IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java index c0c688df153..45a7f6f4534 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java @@ -51,6 +51,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -163,15 +164,15 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest { @AfterEach public void after() throws IOException { if (streams != null) { - streams.close(); + streams.close(Duration.ofSeconds(60)); streams = null; } if (streamsTwo != null) { - streamsTwo.close(); + streamsTwo.close(Duration.ofSeconds(60)); streamsTwo = null; } if (streamsThree != null) { - streamsThree.close(); + streamsThree.close(Duration.ofSeconds(60)); streamsThree = null; } IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java index 58438efc5b1..86174d3f8be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java @@ -44,6 +44,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -121,8 +122,8 @@ public class KTableKTableForeignKeyJoinDistributedTest { @AfterEach public void after() { - client1.close(); - client2.close(); + client1.close(Duration.ofSeconds(60)); + client2.close(Duration.ofSeconds(60)); quietlyCleanStateAfterTest(CLUSTER, client1); quietlyCleanStateAfterTest(CLUSTER, client2); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index db9977901d9..68e9d8a518e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -325,7 +325,7 @@ public class MetricsIntegrationTest { } private void closeApplication() throws Exception { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); kafkaStreams.cleanUp(); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); final long timeout = 60000; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index ebf9674ac35..ed3751ee43b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -101,7 +101,7 @@ public class OptimizedKTableIntegrationTest { @AfterEach public void after() { for (final KafkaStreams kafkaStreams : streamsToCleanup) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java index bc21a2a8b6a..cad16762210 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java @@ -349,7 +349,7 @@ public class PositionRestartIntegrationTest { @AfterEach public void afterTest() { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); kafkaStreams.cleanUp(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 7fff8099a16..fe82f3360ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -57,6 +57,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -149,7 +150,7 @@ public class RegexSourceIntegrationTest { @AfterEach public void tearDown() throws IOException { if (streams != null) { - streams.close(); + streams.close(Duration.ofSeconds(60)); } // Remove any state from previous test runs IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java index 53568a45999..b75b6c517c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java @@ -57,6 +57,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -123,7 +124,7 @@ public class SlidingWindowedKStreamIntegrationTest { @AfterEach public void whenShuttingDown() throws IOException { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); kafkaStreams.cleanUp(); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index aa08da6da26..418f78c088d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -43,6 +43,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.Properties; import java.util.function.Predicate; @@ -82,8 +83,8 @@ public class StandbyTaskCreationIntegrationTest { @AfterEach public void after() { - client1.close(); - client2.close(); + client1.close(Duration.ofSeconds(60)); + client2.close(Duration.ofSeconds(60)); } private Properties streamsConfiguration() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index 1c3bd11957e..38dac86d711 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -112,13 +112,13 @@ public class StandbyTaskEOSIntegrationTest { @AfterEach public void cleanUp() { if (streamInstanceOne != null) { - streamInstanceOne.close(); + streamInstanceOne.close(Duration.ofSeconds(60)); } if (streamInstanceTwo != null) { - streamInstanceTwo.close(); + streamInstanceTwo.close(Duration.ofSeconds(60)); } if (streamInstanceOneRecovery != null) { - streamInstanceOneRecovery.close(); + streamInstanceOneRecovery.close(Duration.ofSeconds(60)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java index e4d355cdabe..1f8c9ba8e42 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java @@ -111,13 +111,13 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest { @AfterEach public void cleanUp() { if (streamInstanceOne != null) { - streamInstanceOne.close(); + streamInstanceOne.close(Duration.ofSeconds(60)); } if (streamInstanceTwo != null) { - streamInstanceTwo.close(); + streamInstanceTwo.close(Duration.ofSeconds(60)); } if (streamInstanceThree != null) { - streamInstanceThree.close(); + streamInstanceThree.close(Duration.ofSeconds(60)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index a5ba13bd0b0..15eed5df049 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -119,7 +119,7 @@ public class StoreQueryIntegrationTest { @AfterEach public void after() { for (final KafkaStreams kafkaStreams : streamsToCleanup) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } streamsToCleanup.clear(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java index 2b630b65ee4..f25ffca5d9f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java @@ -49,6 +49,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -107,7 +108,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest { @AfterEach public void whenShuttingDown() throws IOException { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java index 25f9afd5428..886908cec71 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java @@ -51,6 +51,7 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.Properties; @@ -102,7 +103,7 @@ public class SwallowUnknownTopicErrorIntegrationTest { public void after() throws InterruptedException { CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT); if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); kafkaStreams.cleanUp(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java index 38287d7556d..7c8592f29a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java @@ -60,6 +60,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -130,7 +131,7 @@ public class TimeWindowedKStreamIntegrationTest { @AfterEach public void whenShuttingDown() throws IOException { if (kafkaStreams != null) { - kafkaStreams.close(); + kafkaStreams.close(Duration.ofSeconds(60)); kafkaStreams.cleanUp(); } IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java index 2ab34592604..c8727f468b3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; @@ -63,7 +64,7 @@ public class NamedTopologyTest { @AfterEach public void cleanup() { - streams.close(); + streams.close(Duration.ofSeconds(60)); } private static Properties configProps() {