KAFKA-5697: issue Consumer#wakeup during Streams shutdown

Wakeup consumers during shutdown to break them out of any internally blocking calls.

Semantically, it should be fine to treat a WakeupException as "no work to do", which will then continue the threads' polling loops, leading them to discover that they are supposed to shut down, which they will do gracefully.

The existing tests should be sufficient to verify no regressions.

Author: John Roesler <john@confluent.io>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #4930 from vvcephei/streams-client-wakeup-on-shutdown

minor javadoc updates
This commit is contained in:
John Roesler 2018-05-04 09:02:50 -07:00 committed by Guozhang Wang
parent af983267be
commit 2d8049b713
18 changed files with 315 additions and 59 deletions

View File

@ -67,6 +67,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
@ -377,12 +378,20 @@ public class KafkaStreams {
} }
/** /**
* Get read-only handle on global metrics registry. * Get read-only handle on global metrics registry, including streams client's own metrics plus
* its embedded consumer clients' metrics.
* *
* @return Map of all metrics. * @return Map of all metrics.
*/ */
// TODO: we can add metrics for producer and admin client as well
public Map<MetricName, ? extends Metric> metrics() { public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(metrics.metrics()); final Map<MetricName, Metric> result = new LinkedHashMap<>();
for (final StreamThread thread : threads) {
result.putAll(thread.consumerMetrics());
}
if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
result.putAll(metrics.metrics());
return Collections.unmodifiableMap(result);
} }
/** /**

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;
public class ShutdownException extends StreamsException {
public ShutdownException(final String message) {
super(message);
}
public ShutdownException(final String message, final Throwable throwable) {
super(message, throwable);
}
public ShutdownException(final Throwable throwable) {
super(throwable);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Collections;
import java.util.List;
public final class ConsumerUtils {
private ConsumerUtils() {}
public static <K, V> ConsumerRecords<K, V> poll(final Consumer<K, V> consumer, final long maxDurationMs) {
try {
return consumer.poll(maxDurationMs);
} catch (final WakeupException e) {
return new ConsumerRecords<>(Collections.<TopicPartition, List<ConsumerRecord<K, V>>>emptyMap());
}
}
}

View File

@ -23,12 +23,14 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.ShutdownException;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateRestoreCallback;
@ -46,6 +48,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
/** /**
* This class is responsible for the initialization, restoration, closing, flushing etc * This class is responsible for the initialization, restoration, closing, flushing etc
* of Global State Stores. There is only ever 1 instance of this class per Application Instance. * of Global State Stores. There is only ever 1 instance of this class per Application Instance.
@ -60,13 +64,15 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
private InternalProcessorContext processorContext; private InternalProcessorContext processorContext;
private final int retries; private final int retries;
private final long retryBackoffMs; private final long retryBackoffMs;
private final IsRunning isRunning;
public GlobalStateManagerImpl(final LogContext logContext, public GlobalStateManagerImpl(final LogContext logContext,
final ProcessorTopology topology, final ProcessorTopology topology,
final Consumer<byte[], byte[]> globalConsumer, final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory, final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener, final StateRestoreListener stateRestoreListener,
final StreamsConfig config) { final StreamsConfig config,
final IsRunning isRunning) {
super(stateDirectory.globalStateDir()); super(stateDirectory.globalStateDir());
this.log = logContext.logger(GlobalStateManagerImpl.class); this.log = logContext.logger(GlobalStateManagerImpl.class);
@ -76,6 +82,11 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
this.stateRestoreListener = stateRestoreListener; this.stateRestoreListener = stateRestoreListener;
this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG); this.retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG); this.retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
this.isRunning = isRunning;
}
public interface IsRunning {
boolean check();
} }
@Override @Override
@ -200,6 +211,13 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
try { try {
partitionInfos = globalConsumer.partitionsFor(sourceTopic); partitionInfos = globalConsumer.partitionsFor(sourceTopic);
break; break;
} catch (final WakeupException wakeupException) {
if (isRunning.check()) {
// note we may decide later that this condition is ok and just let the retry loop continue
throw new IllegalStateException("Got unexpected WakeupException during initialization.", wakeupException);
} else {
throw new ShutdownException("Shutting down from fetching partitions");
}
} catch (final TimeoutException retryableException) { } catch (final TimeoutException retryableException) {
if (++attempts > retries) { if (++attempts > retries) {
log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. " + log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. " +
@ -250,19 +268,20 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements Glob
long offset = globalConsumer.position(topicPartition); long offset = globalConsumer.position(topicPartition);
final Long highWatermark = highWatermarks.get(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition);
BatchingStateRestoreCallback final BatchingStateRestoreCallback stateRestoreAdapter =
stateRestoreAdapter = (BatchingStateRestoreCallback) ((stateRestoreCallback instanceof BatchingStateRestoreCallback)
(BatchingStateRestoreCallback) ((stateRestoreCallback instanceof ? stateRestoreCallback
BatchingStateRestoreCallback) : new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
? stateRestoreCallback
: new WrappedBatchingStateRestoreCallback(stateRestoreCallback));
stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
long restoreCount = 0L; long restoreCount = 0L;
while (offset < highWatermark) { while (offset < highWatermark) {
if (!isRunning.check()) {
throw new ShutdownException("Streams is not running (any more)");
}
try { try {
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(100); final ConsumerRecords<byte[], byte[]> records = poll(globalConsumer, 100);
final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>(); final List<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> record : records) { for (ConsumerRecord<byte[], byte[]> record : records) {
if (record.key() != null) { if (record.key() != null) {

View File

@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.LogContext;
@ -27,6 +29,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ShutdownException;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -35,10 +38,12 @@ import org.slf4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.DEAD;
import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN; import static org.apache.kafka.streams.processor.internals.GlobalStreamThread.State.PENDING_SHUTDOWN;
@ -103,6 +108,10 @@ public class GlobalStreamThread extends Thread {
return equals(RUNNING); return equals(RUNNING);
} }
public boolean isStarting() {
return equals(CREATED);
}
@Override @Override
public boolean isValidTransition(final ThreadStateTransitionValidator newState) { public boolean isValidTransition(final ThreadStateTransitionValidator newState) {
final State tmpState = (State) newState; final State tmpState = (State) newState;
@ -170,6 +179,12 @@ public class GlobalStreamThread extends Thread {
} }
} }
private boolean stillStarting() {
synchronized (stateLock) {
return state.isStarting();
}
}
public GlobalStreamThread(final ProcessorTopology topology, public GlobalStreamThread(final ProcessorTopology topology,
final StreamsConfig config, final StreamsConfig config,
final Consumer<byte[], byte[]> globalConsumer, final Consumer<byte[], byte[]> globalConsumer,
@ -232,7 +247,7 @@ public class GlobalStreamThread extends Thread {
void pollAndUpdate() { void pollAndUpdate() {
try { try {
final ConsumerRecords<byte[], byte[]> received = globalConsumer.poll(pollMs); final ConsumerRecords<byte[], byte[]> received = poll(globalConsumer, pollMs);
for (final ConsumerRecord<byte[], byte[]> record : received) { for (final ConsumerRecord<byte[], byte[]> record : received) {
stateMaintainer.update(record); stateMaintainer.update(record);
} }
@ -263,7 +278,19 @@ public class GlobalStreamThread extends Thread {
@Override @Override
public void run() { public void run() {
final StateConsumer stateConsumer = initialize(); final StateConsumer stateConsumer;
try {
stateConsumer = initialize();
} catch (final ShutdownException e) {
log.info("Shutting down from initialization");
// Almost certainly, we arrived here because the state is already PENDING_SHUTDOWN, but it's harmless to
// just make sure
setState(State.PENDING_SHUTDOWN);
setState(State.DEAD);
streamsMetrics.removeAllThreadLevelSensors();
log.info("Shutdown complete");
return;
}
if (stateConsumer == null) { if (stateConsumer == null) {
// during initialization, the caller thread would wait for the state consumer // during initialization, the caller thread would wait for the state consumer
@ -275,6 +302,7 @@ public class GlobalStreamThread extends Thread {
setState(State.DEAD); setState(State.DEAD);
log.warn("Error happened during initialization of the global state store; this thread has shutdown"); log.warn("Error happened during initialization of the global state store; this thread has shutdown");
streamsMetrics.removeAllThreadLevelSensors();
return; return;
} }
@ -314,7 +342,14 @@ public class GlobalStreamThread extends Thread {
globalConsumer, globalConsumer,
stateDirectory, stateDirectory,
stateRestoreListener, stateRestoreListener,
config); config,
new GlobalStateManagerImpl.IsRunning() {
@Override
public boolean check() {
return stillStarting() || stillRunning();
}
}
);
final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl( final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
config, config,
@ -367,5 +402,10 @@ public class GlobalStreamThread extends Thread {
// one could call shutdown() multiple times, so ignore subsequent calls // one could call shutdown() multiple times, so ignore subsequent calls
// if already shutting down or dead // if already shutting down or dead
setState(PENDING_SHUTDOWN); setState(PENDING_SHUTDOWN);
globalConsumer.wakeup();
}
public Map<MetricName, Metric> consumerMetrics() {
return Collections.unmodifiableMap(globalConsumer.metrics());
} }
} }

View File

@ -40,6 +40,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
public class StoreChangelogReader implements ChangelogReader { public class StoreChangelogReader implements ChangelogReader {
private final Logger log; private final Logger log;
@ -81,7 +83,7 @@ public class StoreChangelogReader implements ChangelogReader {
final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet()); final Set<TopicPartition> restoringPartitions = new HashSet<>(needsRestoring.keySet());
try { try {
final ConsumerRecords<byte[], byte[]> allRecords = restoreConsumer.poll(10); final ConsumerRecords<byte[], byte[]> allRecords = poll(restoreConsumer, 10);
for (final TopicPartition partition : restoringPartitions) { for (final TopicPartition partition : restoringPartitions) {
restorePartition(allRecords, partition, active.restoringTaskFor(partition)); restorePartition(allRecords, partition, active.restoringTaskFor(partition));
} }

View File

@ -26,6 +26,8 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
@ -54,6 +56,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -62,6 +65,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
public class StreamThread extends Thread { public class StreamThread extends Thread {
@ -824,7 +828,7 @@ public class StreamThread extends Thread {
ConsumerRecords<byte[], byte[]> records = null; ConsumerRecords<byte[], byte[]> records = null;
try { try {
records = consumer.poll(pollTimeMs); records = poll(consumer, pollTimeMs);
} catch (final InvalidOffsetException e) { } catch (final InvalidOffsetException e) {
resetInvalidOffsets(e); resetInvalidOffsets(e);
} }
@ -1051,7 +1055,7 @@ public class StreamThread extends Thread {
} }
try { try {
final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(0); final ConsumerRecords<byte[], byte[]> records = poll(restoreConsumer, 0);
if (!records.isEmpty()) { if (!records.isEmpty()) {
for (final TopicPartition partition : records.partitions()) { for (final TopicPartition partition : records.partitions()) {
@ -1116,6 +1120,8 @@ public class StreamThread extends Thread {
public void shutdown() { public void shutdown() {
log.info("Informed to shut down"); log.info("Informed to shut down");
final State oldState = setState(State.PENDING_SHUTDOWN); final State oldState = setState(State.PENDING_SHUTDOWN);
consumer.wakeup();
restoreConsumer.wakeup();
if (oldState == State.CREATED) { if (oldState == State.CREATED) {
// The thread may not have been started. Take responsibility for shutting down // The thread may not have been started. Take responsibility for shutting down
completeShutdown(true); completeShutdown(true);
@ -1210,4 +1216,13 @@ public class StreamThread extends Thread {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() { Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords() {
return standbyRecords; return standbyRecords;
} }
public Map<MetricName, Metric> consumerMetrics() {
final Map<MetricName, ? extends Metric> consumerMetrics = consumer.metrics();
final Map<MetricName, ? extends Metric> restoreConsumerMetrics = restoreConsumer.metrics();
final LinkedHashMap<MetricName, Metric> result = new LinkedHashMap<>();
result.putAll(consumerMetrics);
result.putAll(restoreConsumerMetrics);
return result;
}
} }

View File

@ -18,8 +18,6 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
@ -43,6 +41,7 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -232,7 +231,43 @@ public class KafkaStreamsTest {
streams.close(); streams.close();
assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING);
}
@Ignore // this test cannot pass as long as GST blocks KS.start()
@Test
public void testGlobalThreadCloseWithoutConnectingToBroker() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.globalTable("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
streams.close();
// There's nothing to assert... We're testing that this operation actually completes.
}
@Test
public void testLocalThreadCloseWithoutConnectingToBroker() {
final Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:1");
props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
final StreamsBuilder builder = new StreamsBuilder();
// make sure we have the global state thread running too
builder.table("anyTopic");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
streams.close();
// There's nothing to assert... We're testing that this operation actually completes.
} }
@ -327,16 +362,6 @@ public class KafkaStreamsTest {
} }
} }
@Test
public void testNumberDefaultMetrics() {
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "1");
final StreamsBuilder builder = new StreamsBuilder();
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final Map<MetricName, ? extends Metric> metrics = streams.metrics();
// all 22 default StreamThread metrics + 1 metric that keeps track of number of metrics
assertEquals(23, metrics.size());
}
@Test @Test
public void testIllegalMetricsConfig() { public void testIllegalMetricsConfig() {
props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig"); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");

View File

@ -55,6 +55,7 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -118,8 +119,9 @@ public class InternalTopicIntegrationTest {
final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); final Map<String, Properties> topicConfigs = scala.collection.JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) { for (Map.Entry<String, Properties> topicConfig : topicConfigs.entrySet()) {
if (topicConfig.getKey().equals(changelog)) if (topicConfig.getKey().equals(changelog)) {
return topicConfig.getValue(); return topicConfig.getValue();
}
} }
return new Properties(); return new Properties();
@ -157,6 +159,7 @@ public class InternalTopicIntegrationTest {
// //
// Step 3: Verify the state changelog topics are compact // Step 3: Verify the state changelog topics are compact
// //
waitForCompletion(streams, 2, 5000);
streams.close(); streams.close();
final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts")); final Properties changelogProps = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "Counts"));
@ -201,6 +204,7 @@ public class InternalTopicIntegrationTest {
// //
// Step 3: Verify the state changelog topics are compact // Step 3: Verify the state changelog topics are compact
// //
waitForCompletion(streams, 2, 5000);
streams.close(); streams.close();
final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows")); final Properties properties = getTopicProperties(ProcessorStateManager.storeChangelogTopic(appID, "CountWindows"));
final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(",")); final List<String> policies = Arrays.asList(properties.getProperty(LogConfig.CleanupPolicyProp()).split(","));

View File

@ -28,10 +28,12 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.UpdateMetadataRequest; import org.apache.kafka.common.requests.UpdateMetadataRequest;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestCondition;
@ -49,6 +51,8 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
/** /**
* Utility functions to make integration testing more convenient. * Utility functions to make integration testing more convenient.
*/ */
@ -158,6 +162,40 @@ public class IntegrationTestUtils {
produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions); produceKeyValuesSynchronously(topic, keyedRecords, producerConfig, time, enableTransactions);
} }
/**
* Wait for streams to "finish", based on the consumer lag metric.
*
* Caveats:
* - Inputs must be finite, fully loaded, and flushed before this method is called
* - expectedPartitions is the total number of partitions to watch the lag on, including both input and internal.
* It's somewhat ok to get this wrong, as the main failure case would be an immediate return due to the clients
* not being initialized, which you can avoid with any non-zero value. But it's probably better to get it right ;)
*/
public static void waitForCompletion(final KafkaStreams streams,
final int expectedPartitions,
final int timeoutMilliseconds) {
final long start = System.currentTimeMillis();
while (true) {
int lagMetrics = 0;
double totalLag = 0.0;
for (final Metric metric : streams.metrics().values()) {
if (metric.metricName().name().equals("records-lag")) {
lagMetrics++;
totalLag += ((Number) metric.metricValue()).doubleValue();
}
}
if (lagMetrics >= expectedPartitions && totalLag == 0.0) {
return;
}
if (System.currentTimeMillis() - start >= timeoutMilliseconds) {
throw new RuntimeException(String.format(
"Timed out waiting for completion. lagMetrics=[%s/%s] totalLag=[%s]",
lagMetrics, expectedPartitions, totalLag
));
}
}
}
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig,
final String topic, final String topic,
final int expectedNumRecords) throws InterruptedException { final int expectedNumRecords) throws InterruptedException {
@ -352,7 +390,7 @@ public class IntegrationTestUtils {
while (totalPollTimeMs < waitTime && while (totalPollTimeMs < waitTime &&
continueConsuming(consumedValues.size(), maxMessages)) { continueConsuming(consumedValues.size(), maxMessages)) {
totalPollTimeMs += pollIntervalMs; totalPollTimeMs += pollIntervalMs;
final ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); final ConsumerRecords<K, V> records = poll(consumer, pollIntervalMs);
for (final ConsumerRecord<K, V> record : records) { for (final ConsumerRecord<K, V> record : records) {
consumedValues.add(new KeyValue<>(record.key(), record.value())); consumedValues.add(new KeyValue<>(record.key(), record.value()));

View File

@ -62,6 +62,8 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
/** /**
* Class that provides support for a series of benchmarks. It is usually driven by * Class that provides support for a series of benchmarks. It is usually driven by
* tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py. * tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py.
@ -334,7 +336,7 @@ public class SimpleBenchmark {
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
while (true) { while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS); final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS);
if (records.isEmpty()) { if (records.isEmpty()) {
if (processedRecords == numRecords) { if (processedRecords == numRecords) {
break; break;
@ -372,7 +374,7 @@ public class SimpleBenchmark {
consumer.seekToBeginning(partitions); consumer.seekToBeginning(partitions);
while (true) { while (true) {
final ConsumerRecords<Integer, byte[]> records = consumer.poll(POLL_MS); final ConsumerRecords<Integer, byte[]> records = poll(consumer, POLL_MS);
if (records.isEmpty()) { if (records.isEmpty()) {
if (processedRecords == numRecords) { if (processedRecords == numRecords) {
break; break;

View File

@ -79,6 +79,14 @@ public class GlobalStateManagerImplTest {
private final TopicPartition t2 = new TopicPartition("t2", 1); private final TopicPartition t2 = new TopicPartition("t2", 1);
private final TopicPartition t3 = new TopicPartition("t3", 1); private final TopicPartition t3 = new TopicPartition("t3", 1);
private final TopicPartition t4 = new TopicPartition("t4", 1); private final TopicPartition t4 = new TopicPartition("t4", 1);
private final GlobalStateManagerImpl.IsRunning alwaysRunning = new GlobalStateManagerImpl.IsRunning() {
@Override
public boolean check() {
return true;
}
};
private GlobalStateManagerImpl stateManager; private GlobalStateManagerImpl stateManager;
private StateDirectory stateDirectory; private StateDirectory stateDirectory;
private StreamsConfig streamsConfig; private StreamsConfig streamsConfig;
@ -119,7 +127,8 @@ public class GlobalStateManagerImplTest {
consumer, consumer,
stateDirectory, stateDirectory,
stateRestoreListener, stateRestoreListener,
streamsConfig); streamsConfig,
alwaysRunning);
processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig); processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
stateManager.setGlobalProcessorContext(processorContext); stateManager.setGlobalProcessorContext(processorContext);
checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME); checkpointFile = new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME);
@ -496,12 +505,20 @@ public class GlobalStateManagerImplTest {
@Test @Test
public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() { public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
stateManager = new GlobalStateManagerImpl(new LogContext("mock"), topology, consumer, new StateDirectory(streamsConfig, time) { stateManager = new GlobalStateManagerImpl(
@Override new LogContext("mock"),
public boolean lockGlobalState() throws IOException { topology,
throw new IOException("KABOOM!"); consumer,
} new StateDirectory(streamsConfig, time) {
}, stateRestoreListener, streamsConfig); @Override
public boolean lockGlobalState() throws IOException {
throw new IOException("KABOOM!");
}
},
stateRestoreListener,
streamsConfig,
alwaysRunning
);
try { try {
stateManager.initialize(); stateManager.initialize();
@ -538,7 +555,8 @@ public class GlobalStateManagerImplTest {
consumer, consumer,
stateDirectory, stateDirectory,
stateRestoreListener, stateRestoreListener,
streamsConfig); streamsConfig,
alwaysRunning);
} catch (final StreamsException expected) { } catch (final StreamsException expected) {
assertEquals(numberOfCalls.get(), retries); assertEquals(numberOfCalls.get(), retries);
} }
@ -571,7 +589,8 @@ public class GlobalStateManagerImplTest {
consumer, consumer,
stateDirectory, stateDirectory,
stateRestoreListener, stateRestoreListener,
streamsConfig); streamsConfig,
alwaysRunning);
} catch (final StreamsException expected) { } catch (final StreamsException expected) {
assertEquals(numberOfCalls.get(), retries); assertEquals(numberOfCalls.get(), retries);
} }

View File

@ -63,6 +63,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -189,7 +190,7 @@ public class StandbyTaskTest {
} }
restoreStateConsumer.seekToBeginning(partition); restoreStateConsumer.seekToBeginning(partition);
task.update(partition2, restoreStateConsumer.poll(100).records(partition2)); task.update(partition2, poll(restoreStateConsumer, 100).records(partition2));
StandbyContextImpl context = (StandbyContextImpl) task.context(); StandbyContextImpl context = (StandbyContextImpl) task.context();
MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1); MockStateStore store1 = (MockStateStore) context.getStateMgr().getStore(storeName1);
@ -246,7 +247,7 @@ public class StandbyTaskTest {
} }
// The commit offset is at 0L. Records should not be processed // The commit offset is at 0L. Records should not be processed
List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, restoreStateConsumer.poll(100).records(globalTopicPartition)); List<ConsumerRecord<byte[], byte[]>> remaining = task.update(globalTopicPartition, poll(restoreStateConsumer, 100).records(globalTopicPartition));
assertEquals(5, remaining.size()); assertEquals(5, remaining.size());
committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L)); committedOffsets.put(new TopicPartition(globalTopicPartition.topic(), globalTopicPartition.partition()), new OffsetAndMetadata(10L));

View File

@ -42,6 +42,8 @@ import java.util.Locale;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
public class BrokerCompatibilityTest { public class BrokerCompatibilityTest {
private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic"; private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
@ -153,7 +155,7 @@ public class BrokerCompatibilityTest {
consumer.subscribe(Collections.singletonList(SINK_TOPIC)); consumer.subscribe(Collections.singletonList(SINK_TOPIC));
while (true) { while (true) {
final ConsumerRecords<String, String> records = consumer.poll(100); final ConsumerRecords<String, String> records = poll(consumer, 100);
for (final ConsumerRecord<String, String> record : records) { for (final ConsumerRecord<String, String> record : records) {
if (record.key().equals("key") && record.value().equals("1")) { if (record.key().equals("key") && record.value().equals("1")) {
return; return;

View File

@ -52,6 +52,8 @@ import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
public class EosTestDriver extends SmokeTestUtil { public class EosTestDriver extends SmokeTestUtil {
private static final int MAX_NUMBER_OF_KEYS = 100; private static final int MAX_NUMBER_OF_KEYS = 100;
@ -254,7 +256,7 @@ public class EosTestDriver extends SmokeTestUtil {
topics.add("repartition"); topics.add("repartition");
} }
consumer.subscribe(topics); consumer.subscribe(topics);
consumer.poll(0); poll(consumer, 0);
final Set<TopicPartition> partitions = new HashSet<>(); final Set<TopicPartition> partitions = new HashSet<>();
for (final String topic : topics) { for (final String topic : topics) {
@ -284,7 +286,7 @@ public class EosTestDriver extends SmokeTestUtil {
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
boolean allRecordsReceived = false; boolean allRecordsReceived = false;
while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) { while (!allRecordsReceived && System.currentTimeMillis() < maxWaitTime) {
final ConsumerRecords<byte[], byte[]> receivedRecords = consumer.poll(100); final ConsumerRecords<byte[], byte[]> receivedRecords = poll(consumer, 100);
for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) { for (final ConsumerRecord<byte[], byte[]> record : receivedRecords) {
maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
@ -591,7 +593,7 @@ public class EosTestDriver extends SmokeTestUtil {
long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 100);
if (records.isEmpty()) { if (records.isEmpty()) {
System.out.println("No data received."); System.out.println("No data received.");
for (final TopicPartition tp : partitions) { for (final TopicPartition tp : partitions) {

View File

@ -47,6 +47,8 @@ import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
public class SmokeTestDriver extends SmokeTestUtil { public class SmokeTestDriver extends SmokeTestUtil {
public static final int MAX_RECORD_EMPTY_RETRIES = 60; public static final int MAX_RECORD_EMPTY_RETRIES = 60;
@ -289,7 +291,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
int retry = 0; int retry = 0;
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(500); ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
if (records.isEmpty() && recordsProcessed >= recordsGenerated) { if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
if (verifyMin(min, allData, false) if (verifyMin(min, allData, false)
&& verifyMax(max, allData, false) && verifyMax(max, allData, false)

View File

@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import static org.apache.kafka.streams.processor.internals.ConsumerUtils.poll;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -74,7 +75,7 @@ public class StreamsResetterTest {
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(3, records.count()); assertEquals(3, records.count());
} }
@ -90,7 +91,7 @@ public class StreamsResetterTest {
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L); streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 2L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(2, records.count()); assertEquals(2, records.count());
} }
@ -106,7 +107,7 @@ public class StreamsResetterTest {
streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L); streamsResetter.resetOffsetsTo(consumer, inputTopicPartitions, 4L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(2, records.count()); assertEquals(2, records.count());
} }
@ -122,7 +123,7 @@ public class StreamsResetterTest {
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L); streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 3L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(2, records.count()); assertEquals(2, records.count());
} }
@ -138,7 +139,7 @@ public class StreamsResetterTest {
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L); streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, -3L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(5, records.count()); assertEquals(5, records.count());
} }
@ -154,7 +155,7 @@ public class StreamsResetterTest {
streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L); streamsResetter.shiftOffsetsBy(consumer, inputTopicPartitions, 5L);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(2, records.count()); assertEquals(2, records.count());
} }
@ -172,7 +173,7 @@ public class StreamsResetterTest {
topicPartitionsAndOffset.put(topicPartition, 3L); topicPartitionsAndOffset.put(topicPartition, 3L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(2, records.count()); assertEquals(2, records.count());
} }
@ -190,7 +191,7 @@ public class StreamsResetterTest {
topicPartitionsAndOffset.put(topicPartition, 1L); topicPartitionsAndOffset.put(topicPartition, 1L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(2, records.count()); assertEquals(2, records.count());
} }
@ -208,7 +209,7 @@ public class StreamsResetterTest {
topicPartitionsAndOffset.put(topicPartition, 5L); topicPartitionsAndOffset.put(topicPartition, 5L);
streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset); streamsResetter.resetOffsetsFromResetPlan(consumer, inputTopicPartitions, topicPartitionsAndOffset);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(2, records.count()); assertEquals(2, records.count());
} }
@ -226,7 +227,7 @@ public class StreamsResetterTest {
intermediateTopicPartitions.add(topicPartition); intermediateTopicPartitions.add(topicPartition);
streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions); streamsResetter.maybeSeekToEnd("g1", consumer, intermediateTopicPartitions);
final ConsumerRecords<byte[], byte[]> records = consumer.poll(500); final ConsumerRecords<byte[], byte[]> records = poll(consumer, 500);
assertEquals(2, records.count()); assertEquals(2, records.count());
} }

View File

@ -309,7 +309,13 @@ public class TopologyTestDriver implements Closeable {
consumer, consumer,
stateDirectory, stateDirectory,
stateRestoreListener, stateRestoreListener,
streamsConfig); streamsConfig,
new GlobalStateManagerImpl.IsRunning() {
@Override
public boolean check() {
return true;
}
});
final GlobalProcessorContextImpl globalProcessorContext final GlobalProcessorContextImpl globalProcessorContext
= new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache); = new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache);