KAFKA-4366: KafkaStreams.close() blocks indefinitely

Added `timeout` and `timeUnit` to `KafkaStreams.close(..)`. Now do close on a thread and `join` that thread with the provided `timeout`.
Changed `state` in `KafkaStreams` to use an enum.
Added system test to ensure we don't deadlock on close when an uncaught exception handler that calls `System.exit(..)` is used and there is also a shutdown hook that calls `KafkaStreams.close(...)`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang

Closes #2097 from dguy/kafka-4366
This commit is contained in:
Damian Guy 2016-11-17 12:49:20 -08:00 committed by Guozhang Wang
parent f3aad3b54b
commit 2daa10d77f
7 changed files with 280 additions and 43 deletions

View File

@ -91,12 +91,10 @@ public class KafkaStreams {
private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
private static final String JMX_PREFIX = "kafka.streams";
public static final int DEFAULT_CLOSE_TIMEOUT = 0;
// container states
private static final int CREATED = 0;
private static final int RUNNING = 1;
private static final int STOPPED = 2;
private int state = CREATED;
private enum StreamsState { created, running, stopped }
private StreamsState state = StreamsState.created;
private final StreamThread[] threads;
private final Metrics metrics;
@ -192,14 +190,13 @@ public class KafkaStreams {
public synchronized void start() {
log.debug("Starting Kafka Stream process");
if (state == CREATED) {
for (final StreamThread thread : threads)
if (state == StreamsState.created) {
for (final StreamThread thread : threads) {
thread.start();
state = RUNNING;
}
state = StreamsState.running;
log.info("Started Kafka Stream process");
} else if (state == RUNNING) {
} else if (state == StreamsState.running) {
throw new IllegalStateException("This process was already started.");
} else {
throw new IllegalStateException("Cannot restart after closing.");
@ -210,30 +207,60 @@ public class KafkaStreams {
* Shutdown this stream instance by signaling all the threads to stop,
* and then wait for them to join.
*
* @throws IllegalStateException if process has not started yet
* This will block until all threads have stopped.
*/
public synchronized void close() {
public void close() {
close(DEFAULT_CLOSE_TIMEOUT, TimeUnit.SECONDS);
}
/**
* Shutdown this stream instance by signaling all the threads to stop,
* and then wait up to the timeout for the threads to join.
*
* A timeout of 0 means to wait forever
*
* @param timeout how long to wait for {@link StreamThread}s to shutdown
* @param timeUnit unit of time used for timeout
* @return true if all threads were successfully stopped
*/
public synchronized boolean close(final long timeout, final TimeUnit timeUnit) {
log.debug("Stopping Kafka Stream process");
if (state == StreamsState.running) {
// save the current thread so that if it is a stream thread
// we don't attempt to join it and cause a deadlock
final Thread shutdown = new Thread(new Runnable() {
@Override
public void run() {
// signal the threads to stop and wait
for (final StreamThread thread : threads) {
thread.close();
}
if (state == RUNNING) {
// signal the threads to stop and wait
for (final StreamThread thread : threads)
thread.close();
for (final StreamThread thread : threads) {
try {
if (!thread.stillRunning()) {
thread.join();
}
} catch (final InterruptedException ex) {
Thread.interrupted();
}
}
for (final StreamThread thread : threads) {
try {
thread.join();
} catch (final InterruptedException ex) {
Thread.interrupted();
}
metrics.close();
log.info("Stopped Kafka Stream process");
}
}, "kafka-streams-close-thread");
shutdown.setDaemon(true);
shutdown.start();
try {
shutdown.join(TimeUnit.MILLISECONDS.convert(timeout, timeUnit));
} catch (InterruptedException e) {
Thread.interrupted();
}
state = StreamsState.stopped;
return !shutdown.isAlive();
}
if (state != STOPPED) {
metrics.close();
state = STOPPED;
log.info("Stopped Kafka Stream process");
}
return true;
}
@ -261,7 +288,7 @@ public class KafkaStreams {
* @throws IllegalStateException if instance is currently running
*/
public void cleanUp() {
if (state == RUNNING) {
if (state == StreamsState.running) {
throw new IllegalStateException("Cannot clean up while running.");
}
@ -377,7 +404,7 @@ public class KafkaStreams {
}
private void validateIsRunning() {
if (state != RUNNING) {
if (state != StreamsState.running) {
throw new IllegalStateException("KafkaStreams is not running");
}
}

View File

@ -290,6 +290,7 @@ public class StreamThread extends Thread {
removeStandbyTasks();
log.info("{} Stream thread shutdown complete", logPrefix);
running.set(false);
}
private void unAssignChangeLogPartitions(final boolean rethrowExceptions) {
@ -492,6 +493,7 @@ public class StreamThread extends Thread {
maybeClean();
}
log.debug("{} Shutting down at user request", logPrefix);
}
private void maybeUpdateStandbyTasks() {
@ -538,13 +540,8 @@ public class StreamThread extends Thread {
}
}
private boolean stillRunning() {
if (!running.get()) {
log.debug("{} Shutting down at user request", logPrefix);
return false;
}
return true;
public boolean stillRunning() {
return running.get();
}
private void maybePunctuate(StreamTask task) {

View File

@ -18,16 +18,25 @@
package org.apache.kafka.streams;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class KafkaStreamsTest {
@ -56,7 +65,7 @@ public class KafkaStreamsTest {
final int initCountDifference = newInitCount - oldInitCount;
assertTrue("some reporters should be initialized by calling start()", initCountDifference > 0);
streams.close();
assertTrue(streams.close(15, TimeUnit.SECONDS));
Assert.assertEquals("each reporter initialized should also be closed",
oldCloseCount + initCountDifference, MockMetricsReporter.CLOSE_COUNT.get());
}
@ -86,8 +95,8 @@ public class KafkaStreamsTest {
final KStreamBuilder builder = new KStreamBuilder();
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
streams.close();
try {
streams.start();
} catch (final IllegalStateException e) {
@ -147,6 +156,52 @@ public class KafkaStreamsTest {
});
}
@Test
public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
final AtomicBoolean keepRunning = new AtomicBoolean(true);
try {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
final KStreamBuilder builder = new KStreamBuilder();
final CountDownLatch latch = new CountDownLatch(1);
final String topic = "input";
CLUSTER.createTopic(topic);
builder.stream(Serdes.String(), Serdes.String(), topic)
.foreach(new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
try {
latch.countDown();
while (keepRunning.get()) {
Thread.sleep(10);
}
} catch (InterruptedException e) {
// no-op
}
}
});
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic,
Collections.singletonList(new KeyValue<>("A", "A")),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
StringSerializer.class,
StringSerializer.class,
new Properties()),
System.currentTimeMillis());
assertTrue("Timed out waiting to receive single message", latch.await(30, TimeUnit.SECONDS));
assertFalse(streams.close(10, TimeUnit.MILLISECONDS));
} finally {
// stop the thread so we don't interfere with other tests etc
keepRunning.set(false);
}
}
private KafkaStreams createKafkaStreams() {
final Properties props = new Properties();
@ -191,5 +246,4 @@ public class KafkaStreamsTest {
streams.close();
}
}
}

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.smoketest;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class ShutdownDeadlockTest {
private final String kafka;
private final String zookeeper;
public ShutdownDeadlockTest(final String kafka,
final String zookeeper) {
this.kafka = kafka;
this.zookeeper = zookeeper;
}
public void start() {
final String topic = "source";
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "shouldNotDeadlock");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), topic);
source.foreach(new ForeachAction<String, String>() {
@Override
public void apply(final String key, final String value) {
throw new RuntimeException("KABOOM!");
}
});
final KafkaStreams streams = new KafkaStreams(builder, props);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
System.exit(-1);
}
});
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close(5, TimeUnit.SECONDS);
}
}));
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.send(new ProducerRecord<>(topic, "a", "a"));
producer.flush();
streams.start();
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
// ignored
}
}
}
}

View File

@ -64,6 +64,10 @@ public class StreamsSmokeTest {
}
});
break;
case "close-deadlock-test":
final ShutdownDeadlockTest test = new ShutdownDeadlockTest(kafka, zookeeper);
test.start();
break;
default:
System.out.println("unknown command: " + command);
}

View File

@ -60,6 +60,10 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
except:
return []
def stop_nodes(self, clean_shutdown=True):
for node in self.nodes:
self.stop_node(node, clean_shutdown)
def stop_node(self, node, clean_shutdown=True):
self.logger.info((clean_shutdown and "Cleanly" or "Forcibly") + " stopping Streams Smoke Test on " + str(node.account))
pids = self.pids(node)
@ -80,6 +84,7 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
self.stop_node(node)
self.start_node(node)
def abortThenRestart(self):
# We don't want to do any clean up here, just abort then restart the process. The running service is killed immediately.
for node in self.nodes:
@ -88,10 +93,10 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
self.logger.info("Restarting Kafka Streams on " + str(node.account))
self.start_node(node)
def wait(self):
def wait(self, timeout_sec=180):
for node in self.nodes:
for pid in self.pids(node):
wait_until(lambda: not node.account.alive(pid), timeout_sec=180, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
wait_until(lambda: not node.account.alive(pid), timeout_sec=timeout_sec, err_msg="Streams Smoke Test process on " + str(node.account) + " took too long to exit")
def clean_node(self, node):
node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
@ -137,3 +142,7 @@ class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
def __init__(self, context, kafka):
super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")
class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
def __init__(self, context, kafka):
super(StreamsSmokeTestShutdownDeadlockService, self).__init__(context, kafka, "close-deadlock-test")

View File

@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.mark import ignore
from kafkatest.tests.kafka_test import KafkaTest
from kafkatest.services.streams import StreamsSmokeTestShutdownDeadlockService
class StreamsShutdownDeadlockTest(KafkaTest):
"""
Simple test of Kafka Streams.
"""
def __init__(self, test_context):
super(StreamsShutdownDeadlockTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
'source' : { 'partitions': 1, 'replication-factor': 1 }
})
self.driver = StreamsSmokeTestShutdownDeadlockService(test_context, self.kafka)
def test_shutdown_wont_deadlock(self):
"""
Start ShutdownDeadLockTest, wait for upt to 1 minute, and check that the process exited.
If it hasn't exited then fail as it is deadlocked
"""
self.driver.start()
self.driver.wait(timeout_sec=60)
self.driver.stop_nodes(clean_shutdown=False)
self.driver.stop()