mirror of https://github.com/apache/kafka.git
KAFKA-5697: issue Consumer#wakeup during Streams shutdown
Wakeup consumers during shutdown to break them out of any internally blocking calls. Semantically, it should be fine to treat a WakeupException as "no work to do", which will then continue the threads' polling loops, leading them to discover that they are supposed to shut down, which they will do gracefully. The existing tests should be sufficient to verify no regressions. Author: John Roesler <john@confluent.io> Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com> Closes #4930 from vvcephei/streams-client-wakeup-on-shutdown minor javadoc updates
This commit is contained in:
parent
af983267be
commit
2d8049b713
|
@ -67,6 +67,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
@ -377,12 +378,20 @@ public class KafkaStreams {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get read-only handle on global metrics registry.
|
||||
* Get read-only handle on global metrics registry, including streams client's own metrics plus
|
||||
* its embedded consumer clients' metrics.
|
||||
*
|
||||
* @return Map of all metrics.
|
||||
*/
|
||||
// TODO: we can add metrics for producer and admin client as well
|
||||
public Map<MetricName, ? extends Metric> metrics() {
|
||||
return Collections.unmodifiableMap(metrics.metrics());
|
||||
final Map<MetricName, Metric> result = new LinkedHashMap<>();
|
||||
for (final StreamThread thread : threads) {
|
||||
result.putAll(thread.consumerMetrics());
|
||||
}
|
||||
if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
|
||||
result.putAll(metrics.metrics());
|
||||
return Collections.unmodifiableMap(result);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.errors;
|
||||
|
||||
public class ShutdownException extends StreamsException {
|
||||
public ShutdownException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public ShutdownException(final String message, final Throwable throwable) {
|
||||
super(message, throwable);
|
||||
}
|
||||
|
||||
public ShutdownException(final Throwable throwable) {
|
||||
super(throwable);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public final class ConsumerUtils {
|
||||
private ConsumerUtils() {}
|
||||
|
||||
public static <K, V> ConsumerRecords<K, V> poll(final Consumer<K, V> consumer, final long maxDurationMs) {
|
||||
try {
|
||||
return consumer.poll(maxDurationMs);
|
||||
} catch (final WakeupException e) {
|
||||
return new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<K, V>>>emptyMap());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,12 +23,14 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
|
|||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.errors.TimeoutException;
|
||||
import org.apache.kafka.common.errors.WakeupException;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.LockException;
|
||||
import org.apache.kafka.streams.errors.ProcessorStateException;
|
||||
import org.apache.kafka.streams.errors.ShutdownException;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
|
||||
import org.apache.kafka.streams.processor.StateRestoreCallback;
|
||||
|
@ -46,6 +48,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
|
||||
/**
|
||||
* This class is responsible for the initialization, restoration, closing, flushing etc
|
||||
* of Global State Stores. There is only ever 1 instance of this class per Application Instance.
|
||||
|
@ -60,13 +64,15 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
|
|||
private InternalProcessorContext processorContext;
|
||||
private final int retries;
|
||||
private final long retryBackoffMs;
|
||||
private final IsRunning isRunning;
|
||||
|
||||
public GlobalStateManagerImpl(final LogContext logContext,
|
||||
final ProcessorTopology topology,
|
||||
final Consumer<byte[], byte[]> globalConsumer,
|
||||
final StateDirectory stateDirectory,
|
||||
final StateRestoreListener stateRestoreListener,
|
||||
final StreamsConfig config) {
|
||||
final StreamsConfig config,
|
||||
final IsRunning isRunning) {
|
||||
super(stateDirectory.globalStateDir());
|
||||
|
||||
this.log = logContext.logger(GlobalStateManagerImpl.class);
|
||||
|
@ -76,6 +82,11 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
|
|||
this.stateRestoreListener = stateRestoreListener;
|
||||
this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
|
||||
this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
|
||||
this.isRunning = isRunning;
|
||||
}
|
||||
|
||||
public interface IsRunning {
|
||||
boolean check();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -200,6 +211,13 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
|
|||
try {
|
||||
partitionInfos = globalConsumer.partitionsFor(sourceTopic);
|
||||
break;
|
||||
} catch (final WakeupException wakeupException) {
|
||||
if (isRunning.check()) {
|
||||
// note we may decide later that this condition is ok and just let the retry loop continue
|
||||
throw new IllegalStateException("Got unexpected WakeupException during initialization.", wakeupException);
|
||||
} else {
|
||||
throw new ShutdownException("Shutting down from fetching partitions");
|
||||
}
|
||||
} catch (final TimeoutException retryableException) {
|
||||
if (++attempts > retries) {
|
||||
log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. " +
|
||||
|
@ -250,19 +268,20 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
|
|||
|
||||
long offset = globalConsumer.position(topicPartition);
|
||||
final Long highWatermark = highWatermarks.get(topicPartition);
|
||||
BatchingStateRestoreCallback
|
||||
stateRestoreAdapter =
|
||||
(BatchingStateRestoreCallback) ((stateRestoreCallback instanceof
|
||||
BatchingStateRestoreCallback)
|
||||
? stateRestoreCallback
|
||||
: new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
|
||||
final BatchingStateRestoreCallback stateRestoreAdapter =
|
||||
(BatchingStateRestoreCallback) ((stateRestoreCallback instanceof BatchingStateRestoreCallback)
|
||||
? stateRestoreCallback
|
||||
: new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
|
||||
|
||||
stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
|
||||
long restoreCount = 0L;
|
||||
|
||||
while (offset < highWatermark) {
|
||||
if (!isRunning.check()) {
|
||||
throw new ShutdownException("Streams is not running (any more)");
|
||||
}
|
||||
try {
|
||||
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(globalConsumer, 100);
|
||||
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
|
||||
for (ConsumerRecord<byte[], byte[]> record : records) {
|
||||
if (record.key() != null) {
|
||||
|
|
|
@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.Consumer;
|
|||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.clients.consumer.InvalidOffsetException;
|
||||
import org.apache.kafka.common.Metric;
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
|
@ -27,6 +29,7 @@ import org.apache.kafka.common.utils.Time;
|
|||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.LockException;
|
||||
import org.apache.kafka.streams.errors.ShutdownException;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.processor.StateRestoreListener;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
|
@ -35,10 +38,12 @@ import org.slf4j.Logger;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
|
||||
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
|
||||
|
||||
|
@ -103,6 +108,10 @@ public class GlobalStreamThread extends Thread {
|
|||
return equals(RUNNING);
|
||||
}
|
||||
|
||||
public boolean isStarting() {
|
||||
return equals(CREATED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isValidTransition(final ThreadStateTransitionValidator newState) {
|
||||
final State tmpState = (State) newState;
|
||||
|
@ -170,6 +179,12 @@ public class GlobalStreamThread extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean stillStarting() {
|
||||
synchronized (stateLock) {
|
||||
return state.isStarting();
|
||||
}
|
||||
}
|
||||
|
||||
public GlobalStreamThread(final ProcessorTopology topology,
|
||||
final StreamsConfig config,
|
||||
final Consumer<byte[], byte[]> globalConsumer,
|
||||
|
@ -232,7 +247,7 @@ public class GlobalStreamThread extends Thread {
|
|||
|
||||
void pollAndUpdate() {
|
||||
try {
|
||||
final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs);
|
||||
final ConsumerRecords<byte[], byte[]> received = poll(globalConsumer, pollMs);
|
||||
for (final ConsumerRecord<byte[], byte[]> record : received) {
|
||||
stateMaintainer.update(record);
|
||||
}
|
||||
|
@ -263,7 +278,19 @@ public class GlobalStreamThread extends Thread {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
final StateConsumer stateConsumer = initialize();
|
||||
final StateConsumer stateConsumer;
|
||||
try {
|
||||
stateConsumer = initialize();
|
||||
} catch (final ShutdownException e) {
|
||||
log.info("Shutting down from initialization");
|
||||
// Almost certainly, we arrived here because the state is already PENDING_SHUTDOWN, but it's harmless to
|
||||
// just make sure
|
||||
setState(State.PENDING_SHUTDOWN);
|
||||
setState(State.DEAD);
|
||||
streamsMetrics.removeAllThreadLevelSensors();
|
||||
log.info("Shutdown complete");
|
||||
return;
|
||||
}
|
||||
|
||||
if (stateConsumer == null) {
|
||||
// during initialization, the caller thread would wait for the state consumer
|
||||
|
@ -275,6 +302,7 @@ public class GlobalStreamThread extends Thread {
|
|||
setState(State.DEAD);
|
||||
|
||||
log.warn("Error happened during initialization of the global state store; this thread has shutdown");
|
||||
streamsMetrics.removeAllThreadLevelSensors();
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -314,7 +342,14 @@ public class GlobalStreamThread extends Thread {
|
|||
globalConsumer,
|
||||
stateDirectory,
|
||||
stateRestoreListener,
|
||||
config);
|
||||
config,
|
||||
new GlobalStateManagerImpl.IsRunning() {
|
||||
@Override
|
||||
public boolean check() {
|
||||
return stillStarting() || stillRunning();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
|
||||
config,
|
||||
|
@ -367,5 +402,10 @@ public class GlobalStreamThread extends Thread {
|
|||
// one could call shutdown() multiple times, so ignore subsequent calls
|
||||
// if already shutting down or dead
|
||||
setState(PENDING_SHUTDOWN);
|
||||
globalConsumer.wakeup();
|
||||
}
|
||||
|
||||
public Map<MetricName, Metric> consumerMetrics() {
|
||||
return Collections.unmodifiableMap(globalConsumer.metrics());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,8 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
|
||||
public class StoreChangelogReader implements ChangelogReader {
|
||||
|
||||
private final Logger log;
|
||||
|
@ -81,7 +83,7 @@ public class StoreChangelogReader implements ChangelogReader {
|
|||
|
||||
final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
|
||||
try {
|
||||
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10);
|
||||
final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
|
||||
for (final TopicPartition partition : restoringPartitions) {
|
||||
restorePartition(allRecords, partition, active.restoringTaskFor(partition));
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
|
|||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.Metric;
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.metrics.Metrics;
|
||||
import org.apache.kafka.common.metrics.Sensor;
|
||||
|
@ -54,6 +56,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -62,6 +65,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.util.Collections.singleton;
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
|
||||
public class StreamThread extends Thread {
|
||||
|
||||
|
@ -824,7 +828,7 @@ public class StreamThread extends Thread {
|
|||
ConsumerRecords<byte[], byte[]> records = null;
|
||||
|
||||
try {
|
||||
records = consumer.poll(pollTimeMs);
|
||||
records = poll(consumer, pollTimeMs);
|
||||
} catch (final InvalidOffsetException e) {
|
||||
resetInvalidOffsets(e);
|
||||
}
|
||||
|
@ -1051,7 +1055,7 @@ public class StreamThread extends Thread {
|
|||
}
|
||||
|
||||
try {
|
||||
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(restoreConsumer, 0);
|
||||
|
||||
if (!records.isEmpty()) {
|
||||
for (final TopicPartition partition : records.partitions()) {
|
||||
|
@ -1116,6 +1120,8 @@ public class StreamThread extends Thread {
|
|||
public void shutdown() {
|
||||
log.info("Informed to shut down");
|
||||
final State oldState = setState(State.PENDING_SHUTDOWN);
|
||||
consumer.wakeup();
|
||||
restoreConsumer.wakeup();
|
||||
if (oldState == State.CREATED) {
|
||||
// The thread may not have been started. Take responsibility for shutting down
|
||||
completeShutdown(true);
|
||||
|
@ -1210,4 +1216,13 @@ public class StreamThread extends Thread {
|
|||
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() {
|
||||
return standbyRecords;
|
||||
}
|
||||
|
||||
public Map<MetricName, Metric> consumerMetrics() {
|
||||
final Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics();
|
||||
final Map<MetricName, ? extends Metric> restoreConsumerMetrics = restoreConsumer.metrics();
|
||||
final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
|
||||
result.putAll(consumerMetrics);
|
||||
result.putAll(restoreConsumerMetrics);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,8 +18,6 @@ package org.apache.kafka.streams;
|
|||
|
||||
import org.apache.kafka.clients.producer.MockProducer;
|
||||
import org.apache.kafka.common.Cluster;
|
||||
import org.apache.kafka.common.Metric;
|
||||
import org.apache.kafka.common.MetricName;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.config.ConfigException;
|
||||
|
@ -43,6 +41,7 @@ import org.apache.kafka.test.TestUtils;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
@ -232,7 +231,43 @@ public class KafkaStreamsTest {
|
|||
|
||||
streams.close();
|
||||
assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
|
||||
}
|
||||
|
||||
@Ignore // this test cannot pass as long as GST blocks KS.start()
|
||||
@Test
|
||||
public void testGlobalThreadCloseWithoutConnectingToBroker() {
|
||||
final Properties props = new Properties();
|
||||
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
|
||||
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
|
||||
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
|
||||
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
// make sure we have the global state thread running too
|
||||
builder.globalTable("anyTopic");
|
||||
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
|
||||
streams.start();
|
||||
streams.close();
|
||||
// There's nothing to assert... We're testing that this operation actually completes.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalThreadCloseWithoutConnectingToBroker() {
|
||||
final Properties props = new Properties();
|
||||
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
|
||||
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
|
||||
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
|
||||
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
|
||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
|
||||
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
// make sure we have the global state thread running too
|
||||
builder.table("anyTopic");
|
||||
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
|
||||
streams.start();
|
||||
streams.close();
|
||||
// There's nothing to assert... We're testing that this operation actually completes.
|
||||
}
|
||||
|
||||
|
||||
|
@ -327,16 +362,6 @@ public class KafkaStreamsTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNumberDefaultMetrics() {
|
||||
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
|
||||
final StreamsBuilder builder = new StreamsBuilder();
|
||||
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
|
||||
final Map<MetricName, ? extends Metric> metrics = streams.metrics();
|
||||
// all 22 default StreamThread metrics + 1 metric that keeps track of number of metrics
|
||||
assertEquals(23, metrics.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIllegalMetricsConfig() {
|
||||
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
|
||||
|
|
|
@ -55,6 +55,7 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -118,8 +119,9 @@ public class InternalTopicIntegrationTest {
|
|||
final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
|
||||
|
||||
for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) {
|
||||
if (topicConfig.getKey().equals(changelog))
|
||||
if (topicConfig.getKey().equals(changelog)) {
|
||||
return topicConfig.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
return new Properties();
|
||||
|
@ -157,6 +159,7 @@ public class InternalTopicIntegrationTest {
|
|||
//
|
||||
// Step 3: Verify the state changelog topics are compact
|
||||
//
|
||||
waitForCompletion(streams, 2, 5000);
|
||||
streams.close();
|
||||
|
||||
final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts"));
|
||||
|
@ -201,6 +204,7 @@ public class InternalTopicIntegrationTest {
|
|||
//
|
||||
// Step 3: Verify the state changelog topics are compact
|
||||
//
|
||||
waitForCompletion(streams, 2, 5000);
|
||||
streams.close();
|
||||
final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows"));
|
||||
final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
|
||||
|
|
|
@ -28,10 +28,12 @@ import org.apache.kafka.clients.producer.KafkaProducer;
|
|||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.Metric;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.requests.UpdateMetadataRequest;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.test.TestCondition;
|
||||
|
@ -49,6 +51,8 @@ import java.util.Properties;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
|
||||
/**
|
||||
* Utility functions to make integration testing more convenient.
|
||||
*/
|
||||
|
@ -158,6 +162,40 @@ public class IntegrationTestUtils {
|
|||
produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for streams to "finish", based on the consumer lag metric.
|
||||
*
|
||||
* Caveats:
|
||||
* - Inputs must be finite, fully loaded, and flushed before this method is called
|
||||
* - expectedPartitions is the total number of partitions to watch the lag on, including both input and internal.
|
||||
* It's somewhat ok to get this wrong, as the main failure case would be an immediate return due to the clients
|
||||
* not being initialized, which you can avoid with any non-zero value. But it's probably better to get it right ;)
|
||||
*/
|
||||
public static void waitForCompletion(final KafkaStreams streams,
|
||||
final int expectedPartitions,
|
||||
final int timeoutMilliseconds) {
|
||||
final long start = System.currentTimeMillis();
|
||||
while (true) {
|
||||
int lagMetrics = 0;
|
||||
double totalLag = 0.0;
|
||||
for (final Metric metric : streams.metrics().values()) {
|
||||
if (metric.metricName().name().equals("records-lag")) {
|
||||
lagMetrics++;
|
||||
totalLag += ((Number) metric.metricValue()).doubleValue();
|
||||
}
|
||||
}
|
||||
if (lagMetrics >= expectedPartitions && totalLag == 0.0) {
|
||||
return;
|
||||
}
|
||||
if (System.currentTimeMillis() - start >= timeoutMilliseconds) {
|
||||
throw new RuntimeException(String.format(
|
||||
"Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]",
|
||||
lagMetrics, expectedPartitions, totalLag
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
|
||||
final String topic,
|
||||
final int expectedNumRecords) throws InterruptedException {
|
||||
|
@ -352,7 +390,7 @@ public class IntegrationTestUtils {
|
|||
while (totalPollTimeMs < waitTime &&
|
||||
continueConsuming(consumedValues.size(), maxMessages)) {
|
||||
totalPollTimeMs += pollIntervalMs;
|
||||
final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs);
|
||||
final ConsumerRecords<K, V> records = poll(consumer, pollIntervalMs);
|
||||
|
||||
for (final ConsumerRecord<K, V> record : records) {
|
||||
consumedValues.add(new KeyValue<>(record.key(), record.value()));
|
||||
|
|
|
@ -62,6 +62,8 @@ import java.util.Random;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
|
||||
/**
|
||||
* Class that provides support for a series of benchmarks. It is usually driven by
|
||||
* tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
|
||||
|
@ -334,7 +336,7 @@ public class SimpleBenchmark {
|
|||
consumer.seekToBeginning(partitions);
|
||||
|
||||
while (true) {
|
||||
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
|
||||
final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS);
|
||||
if (records.isEmpty()) {
|
||||
if (processedRecords == numRecords) {
|
||||
break;
|
||||
|
@ -372,7 +374,7 @@ public class SimpleBenchmark {
|
|||
consumer.seekToBeginning(partitions);
|
||||
|
||||
while (true) {
|
||||
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS);
|
||||
final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS);
|
||||
if (records.isEmpty()) {
|
||||
if (processedRecords == numRecords) {
|
||||
break;
|
||||
|
|
|
@ -79,6 +79,14 @@ public class GlobalStateManagerImplTest {
|
|||
private final TopicPartition t2 = new TopicPartition("t2", 1);
|
||||
private final TopicPartition t3 = new TopicPartition("t3", 1);
|
||||
private final TopicPartition t4 = new TopicPartition("t4", 1);
|
||||
|
||||
private final GlobalStateManagerImpl.IsRunning alwaysRunning = new GlobalStateManagerImpl.IsRunning() {
|
||||
@Override
|
||||
public boolean check() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
private GlobalStateManagerImpl stateManager;
|
||||
private StateDirectory stateDirectory;
|
||||
private StreamsConfig streamsConfig;
|
||||
|
@ -119,7 +127,8 @@ public class GlobalStateManagerImplTest {
|
|||
consumer,
|
||||
stateDirectory,
|
||||
stateRestoreListener,
|
||||
streamsConfig);
|
||||
streamsConfig,
|
||||
alwaysRunning);
|
||||
processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
|
||||
stateManager.setGlobalProcessorContext(processorContext);
|
||||
checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
|
||||
|
@ -496,12 +505,20 @@ public class GlobalStateManagerImplTest {
|
|||
|
||||
@Test
|
||||
public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
|
||||
stateManager = new GlobalStateManagerImpl(new LogContext("mock"), topology, consumer, new StateDirectory(streamsConfig, time) {
|
||||
@Override
|
||||
public boolean lockGlobalState() throws IOException {
|
||||
throw new IOException("KABOOM!");
|
||||
}
|
||||
}, stateRestoreListener, streamsConfig);
|
||||
stateManager = new GlobalStateManagerImpl(
|
||||
new LogContext("mock"),
|
||||
topology,
|
||||
consumer,
|
||||
new StateDirectory(streamsConfig, time) {
|
||||
@Override
|
||||
public boolean lockGlobalState() throws IOException {
|
||||
throw new IOException("KABOOM!");
|
||||
}
|
||||
},
|
||||
stateRestoreListener,
|
||||
streamsConfig,
|
||||
alwaysRunning
|
||||
);
|
||||
|
||||
try {
|
||||
stateManager.initialize();
|
||||
|
@ -538,7 +555,8 @@ public class GlobalStateManagerImplTest {
|
|||
consumer,
|
||||
stateDirectory,
|
||||
stateRestoreListener,
|
||||
streamsConfig);
|
||||
streamsConfig,
|
||||
alwaysRunning);
|
||||
} catch (final StreamsException expected) {
|
||||
assertEquals(numberOfCalls.get(), retries);
|
||||
}
|
||||
|
@ -571,7 +589,8 @@ public class GlobalStateManagerImplTest {
|
|||
consumer,
|
||||
stateDirectory,
|
||||
stateRestoreListener,
|
||||
streamsConfig);
|
||||
streamsConfig,
|
||||
alwaysRunning);
|
||||
} catch (final StreamsException expected) {
|
||||
assertEquals(numberOfCalls.get(), retries);
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static java.util.Collections.singleton;
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -189,7 +190,7 @@ public class StandbyTaskTest {
|
|||
}
|
||||
|
||||
restoreStateConsumer.seekToBeginning(partition);
|
||||
task.update(partition2, restoreStateConsumer.poll(100).records(partition2));
|
||||
task.update(partition2, poll(restoreStateConsumer, 100).records(partition2));
|
||||
|
||||
StandbyContextImpl context = (StandbyContextImpl) task.context();
|
||||
MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
|
||||
|
@ -246,7 +247,7 @@ public class StandbyTaskTest {
|
|||
}
|
||||
|
||||
// The commit offset is at 0L. Records should not be processed
|
||||
List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition));
|
||||
List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, poll(restoreStateConsumer, 100).records(globalTopicPartition));
|
||||
assertEquals(5, remaining.size());
|
||||
|
||||
committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));
|
||||
|
|
|
@ -42,6 +42,8 @@ import java.util.Locale;
|
|||
import java.util.Properties;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
|
||||
public class BrokerCompatibilityTest {
|
||||
|
||||
private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
|
||||
|
@ -153,7 +155,7 @@ public class BrokerCompatibilityTest {
|
|||
consumer.subscribe(Collections.singletonList(SINK_TOPIC));
|
||||
|
||||
while (true) {
|
||||
final ConsumerRecords<String, String> records = consumer.poll(100);
|
||||
final ConsumerRecords<String, String> records = poll(consumer, 100);
|
||||
for (final ConsumerRecord<String, String> record : records) {
|
||||
if (record.key().equals("key") && record.value().equals("1")) {
|
||||
return;
|
||||
|
|
|
@ -52,6 +52,8 @@ import java.util.Properties;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
|
||||
public class EosTestDriver extends SmokeTestUtil {
|
||||
|
||||
private static final int MAX_NUMBER_OF_KEYS = 100;
|
||||
|
@ -254,7 +256,7 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
topics.add("repartition");
|
||||
}
|
||||
consumer.subscribe(topics);
|
||||
consumer.poll(0);
|
||||
poll(consumer, 0);
|
||||
|
||||
final Set<TopicPartition> partitions = new HashSet<>();
|
||||
for (final String topic : topics) {
|
||||
|
@ -284,7 +286,7 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
|
||||
boolean allRecordsReceived = false;
|
||||
while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
|
||||
final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100);
|
||||
final ConsumerRecords<byte[], byte[]> receivedRecords = poll(consumer, 100);
|
||||
|
||||
for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
|
||||
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
|
||||
|
@ -591,7 +593,7 @@ public class EosTestDriver extends SmokeTestUtil {
|
|||
|
||||
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
|
||||
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 100);
|
||||
if (records.isEmpty()) {
|
||||
System.out.println("No data received.");
|
||||
for (final TopicPartition tp : partitions) {
|
||||
|
|
|
@ -47,6 +47,8 @@ import java.util.Random;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
|
||||
public class SmokeTestDriver extends SmokeTestUtil {
|
||||
|
||||
public static final int MAX_RECORD_EMPTY_RETRIES = 60;
|
||||
|
@ -289,7 +291,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
|
|||
int retry = 0;
|
||||
final long start = System.currentTimeMillis();
|
||||
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
|
||||
ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
|
||||
if (verifyMin(min, allData, false)
|
||||
&& verifyMax(max, allData, false)
|
||||
|
|
|
@ -40,6 +40,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -74,7 +75,7 @@ public class StreamsResetterTest {
|
|||
|
||||
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(3, records.count());
|
||||
}
|
||||
|
||||
|
@ -90,7 +91,7 @@ public class StreamsResetterTest {
|
|||
|
||||
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(2, records.count());
|
||||
}
|
||||
|
||||
|
@ -106,7 +107,7 @@ public class StreamsResetterTest {
|
|||
|
||||
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(2, records.count());
|
||||
}
|
||||
|
||||
|
@ -122,7 +123,7 @@ public class StreamsResetterTest {
|
|||
|
||||
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(2, records.count());
|
||||
}
|
||||
|
||||
|
@ -138,7 +139,7 @@ public class StreamsResetterTest {
|
|||
|
||||
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(5, records.count());
|
||||
}
|
||||
|
||||
|
@ -154,7 +155,7 @@ public class StreamsResetterTest {
|
|||
|
||||
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(2, records.count());
|
||||
}
|
||||
|
||||
|
@ -172,7 +173,7 @@ public class StreamsResetterTest {
|
|||
topicPartitionsAndOffset.put(topicPartition, 3L);
|
||||
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(2, records.count());
|
||||
}
|
||||
|
||||
|
@ -190,7 +191,7 @@ public class StreamsResetterTest {
|
|||
topicPartitionsAndOffset.put(topicPartition, 1L);
|
||||
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(2, records.count());
|
||||
}
|
||||
|
||||
|
@ -208,7 +209,7 @@ public class StreamsResetterTest {
|
|||
topicPartitionsAndOffset.put(topicPartition, 5L);
|
||||
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(2, records.count());
|
||||
}
|
||||
|
||||
|
@ -226,7 +227,7 @@ public class StreamsResetterTest {
|
|||
intermediateTopicPartitions.add(topicPartition);
|
||||
streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions);
|
||||
|
||||
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500);
|
||||
final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
|
||||
assertEquals(2, records.count());
|
||||
}
|
||||
|
||||
|
|
|
@ -309,7 +309,13 @@ public class TopologyTestDriver implements Closeable {
|
|||
consumer,
|
||||
stateDirectory,
|
||||
stateRestoreListener,
|
||||
streamsConfig);
|
||||
streamsConfig,
|
||||
new GlobalStateManagerImpl.IsRunning() {
|
||||
@Override
|
||||
public boolean check() {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
final GlobalProcessorContextImpl globalProcessorContext
|
||||
= new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);
|
||||
|
|
Loading…
Reference in New Issue