MINOR: fix race condition in KafkaStreamsTest (#6185)

Reviewers: John Roesler <john@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Matthias J. Sax 2019-01-23 16:55:54 -08:00 committed by Guozhang Wang
parent 0efed12f50
commit 86995adbde
1 changed files with 4 additions and 13 deletions

View File

@ -84,7 +84,6 @@ import static org.junit.Assert.fail;
@Category({IntegrationTest.class}) @Category({IntegrationTest.class})
public class KafkaStreamsTest { public class KafkaStreamsTest {
private static final long TIMEOUT_MS = 30_000L;
private static final int NUM_BROKERS = 1; private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2; private static final int NUM_THREADS = 2;
// We need this to avoid the KafkaConsumer hanging on poll // We need this to avoid the KafkaConsumer hanging on poll
@ -158,7 +157,6 @@ public class KafkaStreamsTest {
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> stateListener.numChanges == 2, () -> stateListener.numChanges == 2,
TIMEOUT_MS,
"Streams never started."); "Streams never started.");
Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state()); Assert.assertEquals(KafkaStreams.State.RUNNING, globalStreams.state());
@ -209,7 +207,9 @@ public class KafkaStreamsTest {
globalStreams.close(); globalStreams.close();
Assert.assertEquals(6, stateListener.numChanges); TestUtils.waitForCondition(
() -> stateListener.numChanges == 6,
"Streams never closed.");
Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state()); Assert.assertEquals(KafkaStreams.State.NOT_RUNNING, globalStreams.state());
} }
@ -226,7 +226,6 @@ public class KafkaStreamsTest {
streams.close(); streams.close();
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.NOT_RUNNING, () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
TIMEOUT_MS,
"Streams never stopped."); "Streams never stopped.");
// Ensure that any created clients are closed // Ensure that any created clients are closed
@ -254,7 +253,6 @@ public class KafkaStreamsTest {
streams.start(); streams.start();
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING, () -> streams.state() == KafkaStreams.State.RUNNING,
TIMEOUT_MS,
"Streams never started."); "Streams never started.");
for (int i = 0; i < NUM_THREADS; i++) { for (int i = 0; i < NUM_THREADS; i++) {
@ -262,13 +260,11 @@ public class KafkaStreamsTest {
tmpThread.shutdown(); tmpThread.shutdown();
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> tmpThread.state() == StreamThread.State.DEAD, () -> tmpThread.state() == StreamThread.State.DEAD,
TIMEOUT_MS,
"Thread never stopped."); "Thread never stopped.");
threads[i].join(); threads[i].join();
} }
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.ERROR, () -> streams.state() == KafkaStreams.State.ERROR,
TIMEOUT_MS,
"Streams never stopped."); "Streams never stopped.");
} finally { } finally {
streams.close(); streams.close();
@ -276,7 +272,6 @@ public class KafkaStreamsTest {
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.NOT_RUNNING, () -> streams.state() == KafkaStreams.State.NOT_RUNNING,
TIMEOUT_MS,
"Streams never stopped."); "Streams never stopped.");
final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread"); final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
@ -295,7 +290,6 @@ public class KafkaStreamsTest {
streams.start(); streams.start();
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> streams.state() == KafkaStreams.State.RUNNING, () -> streams.state() == KafkaStreams.State.RUNNING,
TIMEOUT_MS,
"Streams never started."); "Streams never started.");
final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread"); final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread");
globalThreadField.setAccessible(true); globalThreadField.setAccessible(true);
@ -303,7 +297,6 @@ public class KafkaStreamsTest {
globalStreamThread.shutdown(); globalStreamThread.shutdown();
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD,
TIMEOUT_MS,
"Thread never stopped."); "Thread never stopped.");
globalStreamThread.join(); globalStreamThread.join();
assertEquals(streams.state(), KafkaStreams.State.ERROR); assertEquals(streams.state(), KafkaStreams.State.ERROR);
@ -555,7 +548,6 @@ public class KafkaStreamsTest {
globalStreams.start(); globalStreams.start();
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> globalStreams.state() == KafkaStreams.State.RUNNING, () -> globalStreams.state() == KafkaStreams.State.RUNNING,
TIMEOUT_MS,
"Streams never started."); "Streams never started.");
try { try {
@ -613,7 +605,7 @@ public class KafkaStreamsTest {
th.start(); th.start();
try { try {
th.join(TIMEOUT_MS); th.join(30_000L);
assertFalse(th.isAlive()); assertFalse(th.isAlive());
} finally { } finally {
streams.close(); streams.close();
@ -763,7 +755,6 @@ public class KafkaStreamsTest {
final File taskDir = new File(appDir, "0_0"); final File taskDir = new File(appDir, "0_0");
TestUtils.waitForCondition( TestUtils.waitForCondition(
() -> !oldTaskDir.exists() && taskDir.exists(), () -> !oldTaskDir.exists() && taskDir.exists(),
TIMEOUT_MS,
"cleanup has not successfully run"); "cleanup has not successfully run");
assertTrue(taskDir.exists()); assertTrue(taskDir.exists());
} }