KAFKA-17680 Add timeout to streams test teardown (#17346)

When calling KafkaStreams#close from teardown methods in integration tests, we need to pass timeout to avoid potentially blocking forever during teardown.

Reviewers: Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
David Arthur 2024-10-02 16:36:21 -04:00 committed by GitHub
parent ae6e53fab2
commit be1929c44c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 52 additions and 38 deletions

View File

@ -48,6 +48,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -87,7 +88,7 @@ public class ConsistencyVectorIntegrationTest {
@AfterEach
public void after() {
for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
cluster.stop();
}

View File

@ -56,6 +56,7 @@ import org.junit.jupiter.api.Timeout;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -136,7 +137,7 @@ public class GlobalKTableEOSIntegrationTest {
@AfterEach
public void after() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

View File

@ -122,7 +122,7 @@ public class GlobalKTableIntegrationTest {
@AfterEach
public void whenShuttingDown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

View File

@ -51,6 +51,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -137,7 +138,7 @@ public class GlobalStateReprocessTest {
@AfterEach
public void after() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

View File

@ -157,7 +157,7 @@ public class GlobalThreadShutDownOrderTest {
@AfterEach
public void after() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

View File

@ -167,7 +167,7 @@ public class IQv2IntegrationTest {
@AfterEach
public void afterTest() {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
}

View File

@ -754,7 +754,7 @@ public class IQv2StoreIntegrationTest {
public void afterTest() {
// only needed because some of the PAPI cases aren't added yet.
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
}
}

View File

@ -122,7 +122,7 @@ public class IQv2VersionedStoreIntegrationTest {
@AfterEach
public void afterTest() {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
}
}

View File

@ -53,6 +53,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
@ -117,7 +118,7 @@ public class KStreamAggregationDedupIntegrationTest {
@AfterEach
public void whenShuttingDown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

View File

@ -155,7 +155,7 @@ public class KStreamAggregationIntegrationTest {
@AfterEach
public void whenShuttingDown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

View File

@ -107,7 +107,7 @@ public class KStreamKStreamIntegrationTest {
@AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
streams.close(Duration.ofSeconds(60));
streams = null;
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);

View File

@ -138,7 +138,7 @@ public class KStreamRepartitionIntegrationTest {
public void whenShuttingDown() throws IOException {
kafkaStreamsInstances.stream()
.filter(Objects::nonNull)
.forEach(KafkaStreams::close);
.forEach(ks -> ks.close(Duration.ofSeconds(60)));
Utils.delete(testFolder);
}

View File

@ -54,6 +54,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@ -158,15 +159,15 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
@AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
streams.close(Duration.ofSeconds(60));
streams = null;
}
if (streamsTwo != null) {
streamsTwo.close();
streamsTwo.close(Duration.ofSeconds(60));
streamsTwo = null;
}
if (streamsThree != null) {
streamsThree.close();
streamsThree.close(Duration.ofSeconds(60));
streamsThree = null;
}
IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree));

View File

@ -51,6 +51,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -163,15 +164,15 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
@AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
streams.close(Duration.ofSeconds(60));
streams = null;
}
if (streamsTwo != null) {
streamsTwo.close();
streamsTwo.close(Duration.ofSeconds(60));
streamsTwo = null;
}
if (streamsThree != null) {
streamsThree.close();
streamsThree.close(Duration.ofSeconds(60));
streamsThree = null;
}
IntegrationTestUtils.purgeLocalStreamsState(asList(streamsConfig, streamsConfigTwo, streamsConfigThree));

View File

@ -44,6 +44,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@ -121,8 +122,8 @@ public class KTableKTableForeignKeyJoinDistributedTest {
@AfterEach
public void after() {
client1.close();
client2.close();
client1.close(Duration.ofSeconds(60));
client2.close(Duration.ofSeconds(60));
quietlyCleanStateAfterTest(CLUSTER, client1);
quietlyCleanStateAfterTest(CLUSTER, client2);
}

View File

@ -325,7 +325,7 @@ public class MetricsIntegrationTest {
}
private void closeApplication() throws Exception {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
final long timeout = 60000;

View File

@ -101,7 +101,7 @@ public class OptimizedKTableIntegrationTest {
@AfterEach
public void after() {
for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
}

View File

@ -349,7 +349,7 @@ public class PositionRestartIntegrationTest {
@AfterEach
public void afterTest() {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
}
}

View File

@ -57,6 +57,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -149,7 +150,7 @@ public class RegexSourceIntegrationTest {
@AfterEach
public void tearDown() throws IOException {
if (streams != null) {
streams.close();
streams.close(Duration.ofSeconds(60));
}
// Remove any state from previous test runs
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

View File

@ -57,6 +57,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -123,7 +124,7 @@ public class SlidingWindowedKStreamIntegrationTest {
@AfterEach
public void whenShuttingDown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

View File

@ -43,6 +43,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.function.Predicate;
@ -82,8 +83,8 @@ public class StandbyTaskCreationIntegrationTest {
@AfterEach
public void after() {
client1.close();
client2.close();
client1.close(Duration.ofSeconds(60));
client2.close(Duration.ofSeconds(60));
}
private Properties streamsConfiguration() {

View File

@ -112,13 +112,13 @@ public class StandbyTaskEOSIntegrationTest {
@AfterEach
public void cleanUp() {
if (streamInstanceOne != null) {
streamInstanceOne.close();
streamInstanceOne.close(Duration.ofSeconds(60));
}
if (streamInstanceTwo != null) {
streamInstanceTwo.close();
streamInstanceTwo.close(Duration.ofSeconds(60));
}
if (streamInstanceOneRecovery != null) {
streamInstanceOneRecovery.close();
streamInstanceOneRecovery.close(Duration.ofSeconds(60));
}
}

View File

@ -111,13 +111,13 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest {
@AfterEach
public void cleanUp() {
if (streamInstanceOne != null) {
streamInstanceOne.close();
streamInstanceOne.close(Duration.ofSeconds(60));
}
if (streamInstanceTwo != null) {
streamInstanceTwo.close();
streamInstanceTwo.close(Duration.ofSeconds(60));
}
if (streamInstanceThree != null) {
streamInstanceThree.close();
streamInstanceThree.close(Duration.ofSeconds(60));
}
}

View File

@ -119,7 +119,7 @@ public class StoreQueryIntegrationTest {
@AfterEach
public void after() {
for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
streamsToCleanup.clear();
}

View File

@ -49,6 +49,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -107,7 +108,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest {
@AfterEach
public void whenShuttingDown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}

View File

@ -51,6 +51,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
@ -102,7 +103,7 @@ public class SwallowUnknownTopicErrorIntegrationTest {
public void after() throws InterruptedException {
CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT);
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
}
}

View File

@ -60,6 +60,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -130,7 +131,7 @@ public class TimeWindowedKStreamIntegrationTest {
@AfterEach
public void whenShuttingDown() throws IOException {
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.close(Duration.ofSeconds(60));
kafkaStreams.cleanUp();
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);

View File

@ -33,6 +33,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
@ -63,7 +64,7 @@ public class NamedTopologyTest {
@AfterEach
public void cleanup() {
streams.close();
streams.close(Duration.ofSeconds(60));
}
private static Properties configProps() {