MINOR: update flaky KafkaStreamsTest (#16756)

testStateGlobalThreadClose() does fail sometimes, with unclear root
cause. This PR is an attempt to fix it, by cleaning up and improving the
test code across the board.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-08-21 14:23:27 -07:00 committed by GitHub
parent 0bb2aee838
commit 9d81a67009
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 49 additions and 43 deletions

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
@ -138,7 +139,7 @@ import static org.mockito.Mockito.when;
public class KafkaStreamsTest {
private static final int NUM_THREADS = 2;
private static final String APPLICATION_ID = "appId";
private static final String APPLICATION_ID = "appId-";
private static final String CLIENT_ID = "test-client";
private static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
@ -180,14 +181,14 @@ public class KafkaStreamsTest {
}
@BeforeEach
public void before() throws Exception {
public void before(final TestInfo testInfo) throws Exception {
time = new MockTime();
supplier = new MockClientSupplier();
supplier.setCluster(Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999))));
adminClient = (MockAdminClient) supplier.getAdmin(null);
streamsStateListener = new StateListenerStub();
props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID + safeUniqueTestName(testInfo));
props.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2018");
props.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
@ -851,13 +852,14 @@ public class KafkaStreamsTest {
prepareStreams();
prepareStreamThread(streamThreadOne, 1);
prepareStreamThread(streamThreadTwo, 2);
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
try {
streams.cleanUp();
streams.start();
} finally {
streams.close();
streams.cleanUp();
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
try {
streams.cleanUp();
streams.start();
} finally {
streams.close();
streams.cleanUp();
}
}
}
@ -913,19 +915,19 @@ public class KafkaStreamsTest {
prepareThreadState(streamThreadOne, state1);
prepareThreadState(streamThreadTwo, state2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
streams.close(Duration.ZERO);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
assertThrows(IllegalStateException.class, streams::cleanUp);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
streams.close(Duration.ZERO);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
assertThrows(IllegalStateException.class, streams::cleanUp);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
}
}
@SuppressWarnings("unchecked")
@Test
public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupFalse() throws Exception {
prepareStreams();
@ -963,19 +965,20 @@ public class KafkaStreamsTest {
prepareThreadState(streamThreadOne, state1);
prepareThreadState(streamThreadTwo, state2);
prepareTerminableThread(streamThreadOne);
final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
streams.start();
waitForCondition(
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING,
"Streams never started.");
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
closeOptions.timeout(Duration.ZERO);
streams.close(closeOptions);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
assertThrows(IllegalStateException.class, streams::cleanUp);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
streams.close(closeOptions);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
assertThrows(IllegalStateException.class, streams::cleanUp);
assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true));
}
}
@Test
@ -1020,12 +1023,12 @@ public class KafkaStreamsTest {
prepareThreadState(streamThreadOne, state1);
prepareThreadState(streamThreadTwo, state2);
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", "key", new StringSerializer()));
streams.start();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
streams.close();
waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", new StringSerializer()));
}
}
@ -1037,7 +1040,7 @@ public class KafkaStreamsTest {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
assertEquals(KeyQueryMetadata.NOT_AVAILABLE, streams.queryMetadataForKey("store", "key", Serdes.String().serializer()));
assertEquals(KeyQueryMetadata.NOT_AVAILABLE, streams.queryMetadataForKey("store", "key", new StringSerializer()));
}
}
@ -1191,7 +1194,6 @@ public class KafkaStreamsTest {
}
}
@SuppressWarnings("unchecked")
@Test
public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupTrueWhenThreadsHaventTerminated() throws Exception {
prepareStreams();
@ -1229,7 +1231,6 @@ public class KafkaStreamsTest {
}
}
@SuppressWarnings("unchecked")
@Test
public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws Exception {
prepareStreams();
@ -1291,7 +1292,9 @@ public class KafkaStreamsTest {
final StreamsConfig mockConfig = spy(config);
when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier);
new KafkaStreams(getBuilderWithSource().build(), mockConfig);
try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig)) {
// no-op
}
// It's called once in above when mock
verify(mockConfig, times(2)).getKafkaClientSupplier();
}
@ -1308,7 +1311,9 @@ public class KafkaStreamsTest {
final StreamsConfig mockConfig = spy(config);
when(mockConfig.getKafkaClientSupplier()).thenReturn(supplier);
new KafkaStreams(getBuilderWithSource().build(), mockConfig, time);
try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig, time)) {
// no-op
}
// It's called once in above when mock
verify(mockConfig, times(2)).getKafkaClientSupplier();
}
@ -1324,7 +1329,9 @@ public class KafkaStreamsTest {
final StreamsConfig config = new StreamsConfig(props);
final StreamsConfig mockConfig = spy(config);
new KafkaStreams(getBuilderWithSource().build(), mockConfig, supplier);
try (final KafkaStreams ignored = new KafkaStreams(getBuilderWithSource().build(), mockConfig, supplier)) {
// no-op
}
// It's called once in above when mock
verify(mockConfig, times(0)).getKafkaClientSupplier();
}
@ -1389,7 +1396,7 @@ public class KafkaStreamsTest {
final String inputTopic = safeTestName + "-input";
final String outputTopic = safeTestName + "-output";
final Topology topology = new Topology();
topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
topology.addSource("source", new StringDeserializer(), new StringDeserializer(), inputTopic)
.addProcessor("process", () -> new Processor<String, String, String, String>() {
private ProcessorContext<String, String> context;
@ -1444,8 +1451,7 @@ public class KafkaStreamsTest {
@Test
public void shouldThrowTopologyExceptionOnEmptyTopology() {
prepareStreams();
try {
new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
try (final KafkaStreams ignored = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)) {
fail("Should have thrown TopologyException");
} catch (final TopologyException e) {
assertThat(
@ -1831,7 +1837,7 @@ public class KafkaStreamsTest {
Serdes.String(),
Serdes.Long());
final Topology topology = new Topology();
topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), inputTopic)
topology.addSource("source", new StringDeserializer(), new StringDeserializer(), inputTopic)
.addProcessor("process", () -> new Processor<String, String, String, String>() {
private ProcessorContext<String, String> context;
@ -1858,8 +1864,8 @@ public class KafkaStreamsTest {
topology.addGlobalStore(
globalStoreBuilder,
"global",
Serdes.String().deserializer(),
Serdes.String().deserializer(),
new StringDeserializer(),
new StringDeserializer(),
globalTopicName,
globalTopicName + "-processor",
new MockProcessorSupplier<>());