MINOR: Fix flaky shouldRejectNonExistentStoreName (#9426)

Fix flaky test by making sure Streams is
running before making assertions about IQ.

Reviewers: Lee Dongjin <dongjin@apache.org>, Guozhang Wang <guozhang@apache.org>, Chia-Ping Tsai <chia7712@apache.org>
This commit is contained in:
John Roesler 2020-10-29 12:23:55 -05:00 committed by GitHub
parent 933a813950
commit dc4d3ecbb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 12 additions and 8 deletions

View File

@ -105,7 +105,6 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getRunningStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
@ -237,6 +236,7 @@ public class QueryableStateIntegrationTest {
kafkaStreams.close(ofSeconds(30));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
CLUSTER.deleteAllTopicsAndWait(0L);
}
/**
@ -442,12 +442,16 @@ public class QueryableStateIntegrationTest {
}
@Test
public void shouldRejectNonExistentStoreName() {
public void shouldRejectNonExistentStoreName() throws InterruptedException {
final String uniqueTestName = safeUniqueTestName(getClass(), testName);
final String input = uniqueTestName + "-input";
final String storeName = uniqueTestName + "-input-table";
final StreamsBuilder builder = new StreamsBuilder();
builder.table(
"input",
input,
Materialized
.<String, String, KeyValueStore<Bytes, byte[]>>as("input-table")
.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
);
@ -457,9 +461,11 @@ public class QueryableStateIntegrationTest {
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
));
try (final KafkaStreams streams = getStartedStreams(properties, builder, true)) {
CLUSTER.createTopic(input);
try (final KafkaStreams streams = getRunningStreams(properties, builder, true)) {
final ReadOnlyKeyValueStore<String, String> store =
streams.store(fromNameAndType("input-table", keyValueStore()));
streams.store(fromNameAndType(storeName, keyValueStore()));
assertThat(store, Matchers.notNullValue());
final InvalidStateStoreException exception = assertThrows(
@ -518,8 +524,6 @@ public class QueryableStateIntegrationTest {
" [class org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."
)
);
} finally {
CLUSTER.deleteAllTopicsAndWait(0L);
}
}