KAFKA-9274: Remove `retries` for global task (#9047)

- part of KIP-572
 - removed the usage of `retries` in `GlobalStateManger`
 - instead of retries the new `task.timeout.ms` config is used

Reviewers: John Roesler <john@confluent.io>, Boyang Chen <boyang@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Matthias J. Sax 2020-08-05 14:14:18 -07:00 committed by GitHub
parent 26a217c8e7
commit b351493543
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 804 additions and 206 deletions

View File

@ -162,7 +162,7 @@
files="StreamsMetricsImpl.java"/>
<suppress checks="NPathComplexity"
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager).java"/>
files="(KafkaStreams|StreamsPartitionAssignor|StreamThread|TaskManager|GlobalStateManagerImpl).java"/>
<suppress checks="(FinalLocalVariable|UnnecessaryParentheses|BooleanExpressionComplexity|CyclomaticComplexity|WhitespaceAfter|LocalVariableName)"
files="Murmur3.java"/>

View File

@ -203,7 +203,7 @@
</tr>
<tr class="row-even"><td>commit.interval.ms</td>
<td>Low</td>
<td colspan="2">The frequency with which to save the position (offsets in source topics) of tasks.</td>
<td colspan="2">The frequency in milliseconds with which to save the position (offsets in source topics) of tasks.</td>
<td>30000 milliseconds</td>
</tr>
<tr class="row-odd"><td>default.deserialization.exception.handler</td>
@ -243,8 +243,8 @@
</tr>
<tr class="row-even"><td>max.task.idle.ms</td>
<td>Medium</td>
<td colspan="2">Maximum amount of time a stream task will stay idle while waiting for all partitions to contain data and avoid potential out-of-order record
processing across multiple input streams.</td>
<td colspan="2">Maximum amount of time in milliseconds a stream task will stay idle while waiting for all partitions to contain data
and avoid potential out-of-order record processing across multiple input streams.</td>
<td>0 milliseconds</td>
</tr>
<tr class="row-odd"><td>max.warmup.replicas</td>
@ -269,8 +269,8 @@
</tr>
<tr class="row-odd"><td>metrics.sample.window.ms</td>
<td>Low</td>
<td colspan="2">The window of time a metrics sample is computed over.</td>
<td>30000 milliseconds</td>
<td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td>
<td>30000 milliseconds (30 seconds)</td>
</tr>
<tr class="row-even"><td>num.standby.replicas</td>
<td>Medium</td>
@ -289,7 +289,7 @@
</tr>
<tr class="row-odd"><td>probing.rebalance.interval.ms</td>
<td>Low</td>
<td colspan="2">The maximum time to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<td>600000 milliseconds (10 minutes)</td>
</tr>
<tr class="row-even"><td>processing.guarantee</td>
@ -308,15 +308,10 @@
<td colspan="2">The replication factor for changelog topics and repartition topics created by the application.</td>
<td>1</td>
</tr>
<tr class="row-odd"><td>retries</td>
<td>Medium</td>
<td colspan="2">The number of retries for broker requests that return a retryable error. </td>
<td>0</td>
</tr>
<tr class="row-even"><td>retry.backoff.ms</td>
<td>Medium</td>
<td colspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <code class="docutils literal"><span class="pre">retries</span></code> parameter is configured to be greater than 0. </td>
<td>100</td>
<td>100 milliseconds</td>
</tr>
<tr class="row-odd"><td>rocksdb.config.setter</td>
<td>Medium</td>
@ -326,13 +321,18 @@
<tr class="row-even"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
<td>600000 milliseconds</td>
<td>600000 milliseconds (10 minutes)</td>
</tr>
<tr class="row-odd"><td>state.dir</td>
<td>High</td>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/tmp/kafka-streams</span></code></td>
</tr>
<tr class="row-odd"><td>task.timeout.ms</td>
<td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<td>300000 milliseconds (5 minutes)</td>
</tr>
<tr class="row-even"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology</td>
@ -346,7 +346,7 @@
<tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>86400000 milliseconds = 1 day</td>
<td>86400000 milliseconds (1 day)</td>
</tr>
</tbody>
</table>

View File

@ -95,11 +95,12 @@
</p>
<p>
The configuration parameter <code>retries</code> is deprecated in favor of a the new parameter <code>task.timeout.ms</code>.
Kafka Streams runtime ignores <code>retries</code> if set, however, if would still forward the parameter
to it's internal clients. Note though, that <code>retries</code> is deprecated for the producer and admin client, too.
Thus, instead of setting <code>retries</code>, you should configure the corresponding client timeouts.
The configuration parameter <code>retries</code> is deprecated in favor of the new parameter <code>task.timeout.ms</code>.
Kafka Streams' runtime ignores <code>retries</code> if set, however, it would still forward the parameter
to its internal clients. Note though, that <code>retries</code> is deprecated for the producer and admin client, too.
Thus, instead of setting <code>retries</code>, you should configure the corresponding client timeouts, namely
<code>delivery.timeout.ms</code> and <code>max.block.ms</code> for the producer and
<code>default.api.timeout.ms</code> for the admin client.
</p>
<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>

View File

@ -23,8 +23,8 @@
<ul>
<li>The configuration parameter <code>retries</code> is deprecated for the producer, admin, and Kafka Streams clients
via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams">KIP-572</a>.
You should use the producer's <code>delivery.timeout.ms</code>, admin's <code>default.api.timeout.ms</code>, and
Kafka Streams' new <code>task.timeout.ms</code> parameters instead.
You should use the producer's <code>delivery.timeout.ms</code> and <code>max.block.ms</code>, admin's
<code>default.api.timeout.ms</code>, and Kafka Streams' new <code>task.timeout.ms</code> parameters instead.
Note that parameter <code>retry.backoff.ms</code> is not impacted by this change.
</li>
</ul>

View File

@ -357,7 +357,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code commit.interval.ms} */
@SuppressWarnings("WeakerAccess")
public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor." +
private static final String COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds with which to save the position of the processor." +
" (Note, if <code>processing.guarantee</code> is set to <code>" + EXACTLY_ONCE + "</code>, the default value is <code>" + EOS_DEFAULT_COMMIT_INTERVAL_MS + "</code>," +
" otherwise the default value is <code>" + DEFAULT_COMMIT_INTERVAL_MS + "</code>.";
@ -408,7 +408,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code max.task.idle.ms} */
public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time a stream task will stay idle when not all of its partition buffers contain records," +
private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time in milliseconds a stream task will stay idle when not all of its partition buffers contain records," +
" to avoid potential out-of-order record processing across multiple input streams.";
/** {@code max.warmup.replicas} */
@ -454,8 +454,8 @@ public class StreamsConfig extends AbstractConfig {
/** {@code probing.rebalance.interval.ms} */
public static final String PROBING_REBALANCE_INTERVAL_MS_CONFIG = "probing.rebalance.interval.ms";
private static final String PROBING_REBALANCE_INTERVAL_MS_DOC = "The maximum time to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active. Probing rebalances " +
"will continue to be triggered until the assignment is balanced. Must be at least 1 minute.";
private static final String PROBING_REBALANCE_INTERVAL_MS_DOC = "The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active." +
" Probing rebalances will continue to be triggered until the assignment is balanced. Must be at least 1 minute.";
/** {@code processing.guarantee} */
@SuppressWarnings("WeakerAccess")
@ -529,7 +529,7 @@ public class StreamsConfig extends AbstractConfig {
/** {@code task.timeout.ms} */
public static final String TASK_TIMEOUT_MS_CONFIG = "task.timeout.ms";
public static final String TASK_TIMEOUT_MS_DOC = "Max amount of time a task might stall due to internal errors and retries until an error is raised. " +
public static final String TASK_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. " +
"For a timeout of 0ms, a task would raise an error for the first internal error. " +
"For any timeout larger than 0ms, a task will retry at least once before an error is raised.";

View File

@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.internals;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Map;
/**
* A {@link StreamsConfig} that does not log its configuration on construction.
*
* This producer cleaner output for unit tests using the {@code test-utils},
* since logging the config is not really valuable in this context.
*/
public class QuietStreamsConfig extends StreamsConfig {
public QuietStreamsConfig(final Map<?, ?> props) {
super(props, false);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
@ -30,6 +31,8 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@ -39,12 +42,22 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientUtils {
private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
public static class QuietStreamsConfig extends StreamsConfig {
public QuietStreamsConfig(final Map<?, ?> props) {
super(props, false);
}
}
public static class QuietConsumerConfig extends ConsumerConfig {
public QuietConsumerConfig(final Map<String, Object> props) {
super(props, false);
}
}
public static final class QuietAdminClientConfig extends AdminClientConfig {
QuietAdminClientConfig(final StreamsConfig streamsConfig) {
// If you just want to look up admin configs, you don't care about the clientId

View File

@ -17,14 +17,16 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
@ -48,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.converterForStore;
@ -57,7 +60,10 @@ import static org.apache.kafka.streams.processor.internals.StateManagerUtil.conv
* of Global State Stores. There is only ever 1 instance of this class per Application Instance.
*/
public class GlobalStateManagerImpl implements GlobalStateManager {
private final static long NO_DEADLINE = -1L;
private final Logger log;
private final Time time;
private final Consumer<byte[], byte[]> globalConsumer;
private final File baseDir;
private final StateDirectory stateDirectory;
@ -65,22 +71,22 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private final FixedOrderMap<String, Optional<StateStore>> globalStores = new FixedOrderMap<>();
private final StateRestoreListener stateRestoreListener;
private InternalProcessorContext globalProcessorContext;
private final int retries;
private final long retryBackoffMs;
private final Duration pollTime;
private final Duration requestTimeoutPlusTaskTimeout;
private final long taskTimeoutMs;
private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
private final OffsetCheckpoint checkpointFile;
private final Map<TopicPartition, Long> checkpointFileCache;
private final Map<String, String> storeToChangelogTopic;
private final List<StateStore> globalStateStores;
@SuppressWarnings("deprecation") // TODO: remove in follow up PR when `RETRIES` is removed
public GlobalStateManagerImpl(final LogContext logContext,
final Time time,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> globalConsumer,
final StateDirectory stateDirectory,
final StateRestoreListener stateRestoreListener,
final StreamsConfig config) {
this.time = time;
storeToChangelogTopic = topology.storeToChangelogTopic();
globalStateStores = topology.globalStateStores();
baseDir = stateDirectory.globalStateDir();
@ -98,9 +104,16 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
this.globalConsumer = globalConsumer;
this.stateDirectory = stateDirectory;
this.stateRestoreListener = stateRestoreListener;
retries = config.getInt(StreamsConfig.RETRIES_CONFIG);
retryBackoffMs = config.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG);
pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
final Map<String, Object> consumerProps = config.getGlobalConsumerConfigs("dummy");
// need to add mandatory configs; otherwise `QuietConsumerConfig` throws
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
final int requestTimeoutMs = new ClientUtils.QuietConsumerConfig(consumerProps)
.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
taskTimeoutMs = config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG);
requestTimeoutPlusTaskTimeout =
Duration.ofMillis(requestTimeoutMs + taskTimeoutMs);
}
@Override
@ -140,10 +153,13 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
// make sure each topic-partition from checkpointFileCache is associated with a global state store
checkpointFileCache.keySet().forEach(tp -> {
if (!changelogTopics.contains(tp.topic())) {
log.error("Encountered a topic-partition in the global checkpoint file not associated with any global" +
" state store, topic-partition: {}, checkpoint file: {}. If this topic-partition is no longer valid," +
" an application reset and state store directory cleanup will be required.",
tp.topic(), checkpointFile.toString());
log.error(
"Encountered a topic-partition in the global checkpoint file not associated with any global" +
" state store, topic-partition: {}, checkpoint file: {}. If this topic-partition is no longer valid," +
" an application reset and state store directory cleanup will be required.",
tp.topic(),
checkpointFile.toString()
);
try {
stateDirectory.unlockGlobalState();
} catch (final IOException e) {
@ -184,32 +200,15 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
log.info("Restoring state for global store {}", store.name());
final List<TopicPartition> topicPartitions = topicPartitionsForStore(store);
Map<TopicPartition, Long> highWatermarks = null;
int attempts = 0;
while (highWatermarks == null) {
try {
highWatermarks = globalConsumer.endOffsets(topicPartitions);
} catch (final TimeoutException retryableException) {
if (++attempts > retries) {
log.error("Failed to get end offsets for topic partitions of global store {} after {} retry attempts. " +
"You can increase the number of retries via configuration parameter `retries`.",
store.name(),
retries,
retryableException);
throw new StreamsException(String.format("Failed to get end offsets for topic partitions of global store %s after %d retry attempts. " +
"You can increase the number of retries via configuration parameter `retries`.", store.name(), retries),
retryableException);
}
log.debug("Failed to get end offsets for partitions {}, backing off for {} ms to retry (attempt {} of {})",
topicPartitions,
retryBackoffMs,
attempts,
retries,
retryableException);
Utils.sleep(retryBackoffMs);
}
}
final Map<TopicPartition, Long> highWatermarks = retryUntilSuccessOrThrowOnTaskTimeout(
() -> globalConsumer.endOffsets(topicPartitions),
String.format(
"Failed to get offsets for partitions %s. The broker may be transiently unavailable at the moment.",
topicPartitions
)
);
try {
restoreState(
stateRestoreCallback,
@ -226,35 +225,14 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
final String sourceTopic = storeToChangelogTopic.get(store.name());
List<PartitionInfo> partitionInfos;
int attempts = 0;
while (true) {
try {
partitionInfos = globalConsumer.partitionsFor(sourceTopic);
break;
} catch (final TimeoutException retryableException) {
if (++attempts > retries) {
log.error("Failed to get partitions for topic {} after {} retry attempts due to timeout. " +
"The broker may be transiently unavailable at the moment. " +
"You can increase the number of retries via configuration parameter `retries`.",
sourceTopic,
retries,
retryableException);
throw new StreamsException(String.format("Failed to get partitions for topic %s after %d retry attempts due to timeout. " +
"The broker may be transiently unavailable at the moment. " +
"You can increase the number of retries via configuration parameter `retries`.", sourceTopic, retries),
retryableException);
}
log.debug("Failed to get partitions for topic {} due to timeout. The broker may be transiently unavailable at the moment. " +
"Backing off for {} ms to retry (attempt {} of {})",
sourceTopic,
retryBackoffMs,
attempts,
retries,
retryableException);
Utils.sleep(retryBackoffMs);
}
}
final List<PartitionInfo> partitionInfos = retryUntilSuccessOrThrowOnTaskTimeout(
() -> globalConsumer.partitionsFor(sourceTopic),
String.format(
"Failed to get partitions for topic %s. The broker may be transiently unavailable at the moment.",
sourceTopic
)
);
if (partitionInfos == null || partitionInfos.isEmpty()) {
throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
@ -274,14 +252,22 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
final RecordConverter recordConverter) {
for (final TopicPartition topicPartition : topicPartitions) {
globalConsumer.assign(Collections.singletonList(topicPartition));
long offset;
final Long checkpoint = checkpointFileCache.get(topicPartition);
if (checkpoint != null) {
globalConsumer.seek(topicPartition, checkpoint);
offset = checkpoint;
} else {
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
offset = retryUntilSuccessOrThrowOnTaskTimeout(
() -> globalConsumer.position(topicPartition),
String.format(
"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
topicPartition
)
);
}
long offset = globalConsumer.position(topicPartition);
final Long highWatermark = highWatermarks.get(topicPartition);
final RecordBatchingStateRestoreCallback stateRestoreAdapter =
StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
@ -289,15 +275,51 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark);
long restoreCount = 0L;
while (offset < highWatermark) {
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollTime);
while (offset < highWatermark) { // when we "fix" this loop (KAFKA-7380 / KAFKA-10317)
// we should update the `poll()` timeout below
// we ignore `poll.ms` config during bootstrapping phase and
// apply `request.timeout.ms` plus `task.timeout.ms` instead
//
// the reason is, that `poll.ms` might be too short to give a fetch request a fair chance
// to actually complete and we don't want to start `task.timeout.ms` too early
//
// we also pass `task.timeout.ms` into `poll()` directly right now as it simplifies our own code:
// if we don't pass it in, we would just track the timeout ourselves and call `poll()` again
// in our own retry loop; by passing the timeout we can reuse the consumer's internal retry loop instead
//
// note that using `request.timeout.ms` provides a conservative upper bound for the timeout;
// this implies that we might start `task.timeout.ms` "delayed" -- however, starting the timeout
// delayed is preferable (as it's more robust) than starting it too early
//
// TODO https://issues.apache.org/jira/browse/KAFKA-10315
// -> do a more precise timeout handling if `poll` would throw an exception if a fetch request fails
// (instead of letting the consumer retry fetch requests silently)
//
// TODO https://issues.apache.org/jira/browse/KAFKA-10317 and
// https://issues.apache.org/jira/browse/KAFKA-7380
// -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called
final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(requestTimeoutPlusTaskTimeout);
if (records.isEmpty()) {
// this will always throw
maybeUpdateDeadlineOrThrow(time.milliseconds());
}
final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new ArrayList<>();
for (final ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) {
if (record.key() != null) {
restoreRecords.add(recordConverter.convert(record));
}
}
offset = globalConsumer.position(topicPartition);
offset = retryUntilSuccessOrThrowOnTaskTimeout(
() -> globalConsumer.position(topicPartition),
String.format(
"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.",
topicPartition
)
);
stateRestoreAdapter.restoreBatch(restoreRecords);
stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());
restoreCount += restoreRecords.size();
@ -307,6 +329,48 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
}
}
private <R> R retryUntilSuccessOrThrowOnTaskTimeout(final Supplier<R> supplier,
final String errorMessage) {
long deadlineMs = NO_DEADLINE;
do {
try {
return supplier.get();
} catch (final TimeoutException retriableException) {
if (taskTimeoutMs == 0L) {
throw new StreamsException(
String.format(
"Retrying is disabled. You can enable it by setting `%s` to a value larger than zero.",
StreamsConfig.TASK_TIMEOUT_MS_CONFIG
),
retriableException
);
}
deadlineMs = maybeUpdateDeadlineOrThrow(deadlineMs);
log.warn(errorMessage, retriableException);
}
} while (true);
}
private long maybeUpdateDeadlineOrThrow(final long currentDeadlineMs) {
final long currentWallClockMs = time.milliseconds();
if (currentDeadlineMs == NO_DEADLINE) {
final long newDeadlineMs = currentWallClockMs + taskTimeoutMs;
return newDeadlineMs < 0L ? Long.MAX_VALUE : newDeadlineMs;
} else if (currentWallClockMs >= currentDeadlineMs) {
throw new TimeoutException(String.format(
"Global task did not make progress to restore state within %d ms. Adjust `%s` if needed.",
currentWallClockMs - currentDeadlineMs + taskTimeoutMs,
StreamsConfig.TASK_TIMEOUT_MS_CONFIG
));
}
return currentDeadlineMs;
}
@Override
public void flush() {
log.debug("Flushing all global globalStores registered in the state manager");

View File

@ -337,17 +337,20 @@ public class GlobalStreamThread extends Thread {
try {
final GlobalStateManager stateMgr = new GlobalStateManagerImpl(
logContext,
time,
topology,
globalConsumer,
stateDirectory,
stateRestoreListener,
config);
config
);
final GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(
config,
stateMgr,
streamsMetrics,
cache);
cache
);
stateMgr.setGlobalProcessorContext(globalProcessorContext);
final StateConsumer stateConsumer = new StateConsumer(

View File

@ -30,7 +30,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.ClientUtils.QuietAdminClientConfig;
import org.slf4j.Logger;
import java.util.HashMap;
@ -64,7 +63,7 @@ public class InternalTopicManager {
replicationFactor = streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
windowChangeLogAdditionalRetention = streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
final QuietAdminClientConfig adminConfigs = new QuietAdminClientConfig(streamsConfig);
final AdminClientConfig adminConfigs = new ClientUtils.QuietAdminClientConfig(streamsConfig);
retries = adminConfigs.getInt(AdminClientConfig.RETRIES_CONFIG);
retryBackOffMs = adminConfigs.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);

View File

@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.TaskManager;
@ -57,7 +57,7 @@ public final class AssignorConfiguration {
// NOTE: If you add a new config to pass through to here, be sure to test it in a real
// application. Since we filter out some configurations, we may have to explicitly copy
// them over when we construct the Consumer.
streamsConfig = new QuietStreamsConfig(configs);
streamsConfig = new ClientUtils.QuietStreamsConfig(configs);
internalConfigs = configs;
// Setting the logger with the passed in client thread name

View File

@ -16,7 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
@ -47,7 +49,9 @@ import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -58,10 +62,13 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -130,11 +137,13 @@ public class GlobalStateManagerImplTest {
consumer = new MockConsumer<>(OffsetResetStrategy.NONE);
stateManager = new GlobalStateManagerImpl(
new LogContext("test"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig);
streamsConfig
);
processorContext = new InternalMockProcessorContext(stateDirectory.globalStateDir(), streamsConfig);
stateManager.setGlobalProcessorContext(processorContext);
checkpointFile = new File(stateManager.baseDir(), StateManagerUtil.CHECKPOINT_FILE_NAME);
@ -340,7 +349,7 @@ public class GlobalStateManagerImplTest {
}
@Test
public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws IOException {
public void shouldRestoreRecordsFromCheckpointToHighWatermark() throws IOException {
initializeConsumer(5, 5, t1);
final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
@ -576,6 +585,7 @@ public class GlobalStateManagerImplTest {
public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() {
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
new StateDirectory(streamsConfig, time, true) {
@ -596,72 +606,602 @@ public class GlobalStateManagerImplTest {
}
}
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
@Test
public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
final int retries = 2;
public void shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<org.apache.kafka.common.TopicPartition> partitions) {
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
numberOfCalls.incrementAndGet();
throw new TimeoutException();
throw new TimeoutException("KABOOM!");
}
};
streamsConfig = new StreamsConfig(new Properties() {
{
put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
put(StreamsConfig.RETRIES_CONFIG, retries);
}
});
initializeConsumer(0, 0, t1, t2, t3, t4);
try {
new GlobalStateManagerImpl(
new LogContext("mock"),
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig);
} catch (final StreamsException expected) {
assertEquals(numberOfCalls.get(), retries);
}
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final StreamsException expected = assertThrows(
StreamsException.class,
() -> stateManager.initialize()
);
final Throwable cause = expected.getCause();
assertThat(cause, instanceOf(TimeoutException.class));
assertThat(cause.getMessage(), equalTo("KABOOM!"));
assertEquals(numberOfCalls.get(), 1);
}
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
@Test
public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
final int retries = 2;
public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized List<PartitionInfo> partitionsFor(final String topic) {
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
time.sleep(100L);
numberOfCalls.incrementAndGet();
throw new TimeoutException();
throw new TimeoutException("KABOOM!");
}
};
streamsConfig = new StreamsConfig(new Properties() {
{
put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
put(StreamsConfig.RETRIES_CONFIG, retries);
}
});
initializeConsumer(0, 0, t1, t2, t3, t4);
try {
new GlobalStateManagerImpl(
new LogContext("mock"),
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig);
} catch (final StreamsException expected) {
assertEquals(numberOfCalls.get(), retries);
}
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final TimeoutException expected = assertThrows(
TimeoutException.class,
() -> stateManager.initialize()
);
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
assertEquals(numberOfCalls.get(), 2);
}
@Test
public void shouldRetryWhenEndOffsetsThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
time.sleep(100L);
numberOfCalls.incrementAndGet();
throw new TimeoutException("KABOOM!");
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final TimeoutException expected = assertThrows(
TimeoutException.class,
() -> stateManager.initialize()
);
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
assertEquals(numberOfCalls.get(), 11);
}
@Test
public void shouldNotFailOnSlowProgressWhenEndOffsetsThrowsTimeoutException() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized Map<TopicPartition, Long> endOffsets(final Collection<TopicPartition> partitions) {
time.sleep(1L);
if (numberOfCalls.incrementAndGet() % 3 == 0) {
return super.endOffsets(partitions);
}
throw new TimeoutException("KABOOM!");
}
@Override
public synchronized long position(final TopicPartition partition) {
return numberOfCalls.incrementAndGet();
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 10L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
stateManager.initialize();
}
@Test
public void shouldNotRetryWhenPartitionsForThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public List<PartitionInfo> partitionsFor(final String topic) {
numberOfCalls.incrementAndGet();
throw new TimeoutException("KABOOM!");
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final StreamsException expected = assertThrows(
StreamsException.class,
() -> stateManager.initialize()
);
final Throwable cause = expected.getCause();
assertThat(cause, instanceOf(TimeoutException.class));
assertThat(cause.getMessage(), equalTo("KABOOM!"));
assertEquals(numberOfCalls.get(), 1);
}
@Test
public void shouldRetryAtLeastOnceWhenPartitionsForThrowsTimeoutException() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public List<PartitionInfo> partitionsFor(final String topic) {
time.sleep(100L);
numberOfCalls.incrementAndGet();
throw new TimeoutException("KABOOM!");
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final TimeoutException expected = assertThrows(
TimeoutException.class,
() -> stateManager.initialize()
);
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
assertEquals(numberOfCalls.get(), 2);
}
@Test
public void shouldRetryWhenPartitionsForThrowsTimeoutExceptionUntilTaskTimeoutExpires() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public List<PartitionInfo> partitionsFor(final String topic) {
time.sleep(100L);
numberOfCalls.incrementAndGet();
throw new TimeoutException("KABOOM!");
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final TimeoutException expected = assertThrows(
TimeoutException.class,
() -> stateManager.initialize()
);
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
assertEquals(numberOfCalls.get(), 11);
}
@Test
public void shouldNotFailOnSlowProgressWhenPartitionForThrowsTimeoutException() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public List<PartitionInfo> partitionsFor(final String topic) {
time.sleep(1L);
if (numberOfCalls.incrementAndGet() % 3 == 0) {
return super.partitionsFor(topic);
}
throw new TimeoutException("KABOOM!");
}
@Override
public synchronized long position(final TopicPartition partition) {
return numberOfCalls.incrementAndGet();
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 10L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
stateManager.initialize();
}
@Test
public void shouldNotRetryWhenPositionThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized long position(final TopicPartition partition) {
numberOfCalls.incrementAndGet();
throw new TimeoutException("KABOOM!");
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final StreamsException expected = assertThrows(
StreamsException.class,
() -> stateManager.initialize()
);
final Throwable cause = expected.getCause();
assertThat(cause, instanceOf(TimeoutException.class));
assertThat(cause.getMessage(), equalTo("KABOOM!"));
assertEquals(numberOfCalls.get(), 1);
}
@Test
public void shouldRetryAtLeastOnceWhenPositionThrowsTimeoutException() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized long position(final TopicPartition partition) {
time.sleep(100L);
numberOfCalls.incrementAndGet();
throw new TimeoutException("KABOOM!");
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final TimeoutException expected = assertThrows(
TimeoutException.class,
() -> stateManager.initialize()
);
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 100 ms. Adjust `task.timeout.ms` if needed."));
assertEquals(numberOfCalls.get(), 2);
}
@Test
public void shouldRetryWhenPositionThrowsTimeoutExceptionUntilTaskTimeoutExpired() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized long position(final TopicPartition partition) {
time.sleep(100L);
numberOfCalls.incrementAndGet();
throw new TimeoutException("KABOOM!");
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final TimeoutException expected = assertThrows(
TimeoutException.class,
() -> stateManager.initialize()
);
assertThat(expected.getMessage(), equalTo("Global task did not make progress to restore state within 1000 ms. Adjust `task.timeout.ms` if needed."));
assertEquals(numberOfCalls.get(), 11);
}
@Test
public void shouldNotFailOnSlowProgressWhenPositionThrowsTimeoutException() {
final AtomicInteger numberOfCalls = new AtomicInteger(0);
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized long position(final TopicPartition partition) {
time.sleep(1L);
if (numberOfCalls.incrementAndGet() % 3 == 0) {
return numberOfCalls.incrementAndGet();
}
throw new TimeoutException("KABOOM!");
}
};
initializeConsumer(0, 0, t1, t2, t3, t4);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 10L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
stateManager.initialize();
}
@Test
public void shouldUseRequestTimeoutPlusTaskTimeoutInPollDuringRestoreAndFailIfNoDataReturned() {
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
time.sleep(timeout.toMillis());
return super.poll(timeout);
}
};
final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
startOffsets.put(t1, 1L);
final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(t1, 3L);
consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
consumer.assign(Collections.singletonList(t1));
consumer.updateBeginningOffsets(startOffsets);
consumer.updateEndOffsets(endOffsets);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.POLL_MS_CONFIG, 5L),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 10L),
mkEntry(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final long startTime = time.milliseconds();
final TimeoutException exception = assertThrows(
TimeoutException.class,
() -> stateManager.initialize()
);
assertThat(
exception.getMessage(),
equalTo("Global task did not make progress to restore state within 10 ms. Adjust `task.timeout.ms` if needed.")
);
assertThat(time.milliseconds() - startTime, equalTo(110L));
}
@Test
public void shouldTimeoutWhenNoProgressDuringRestore() {
consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
@Override
public synchronized ConsumerRecords<byte[], byte[]> poll(final Duration timeout) {
time.sleep(1L);
return super.poll(timeout);
}
};
final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
startOffsets.put(t1, 1L);
final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(t1, 3L);
consumer.updatePartitions(t1.topic(), Collections.singletonList(new PartitionInfo(t1.topic(), t1.partition(), null, null, null)));
consumer.assign(Collections.singletonList(t1));
consumer.updateBeginningOffsets(startOffsets);
consumer.updateEndOffsets(endOffsets);
streamsConfig = new StreamsConfig(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 5L)
));
stateManager = new GlobalStateManagerImpl(
new LogContext("mock"),
time,
topology,
consumer,
stateDirectory,
stateRestoreListener,
streamsConfig
);
processorContext.setStateManger(stateManager);
stateManager.setGlobalProcessorContext(processorContext);
final long startTime = time.milliseconds();
final TimeoutException exception = assertThrows(
TimeoutException.class,
() -> stateManager.initialize()
);
assertThat(
exception.getMessage(),
equalTo("Global task did not make progress to restore state within 5 ms. Adjust `task.timeout.ms` if needed.")
);
assertThat(time.milliseconds() - startTime, equalTo(1L));
}
private void writeCorruptCheckpoint() throws IOException {
@ -671,19 +1211,21 @@ public class GlobalStateManagerImplTest {
}
}
private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) {
final HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
startOffsets.put(topicPartition, startOffset);
final HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(topicPartition, startOffset + numRecords);
consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
consumer.assign(Collections.singletonList(topicPartition));
private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition... topicPartitions) {
consumer.assign(Arrays.asList(topicPartitions));
final Map<TopicPartition, Long> startOffsets = new HashMap<>();
final Map<TopicPartition, Long> endOffsets = new HashMap<>();
for (final TopicPartition topicPartition : topicPartitions) {
startOffsets.put(topicPartition, startOffset);
endOffsets.put(topicPartition, startOffset + numRecords);
consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
for (int i = 0; i < numRecords; i++) {
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), startOffset + i, "key".getBytes(), "value".getBytes()));
}
}
consumer.updateEndOffsets(endOffsets);
consumer.updateBeginningOffsets(startOffsets);
for (int i = 0; i < numRecords; i++) {
consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), startOffset + i, "key".getBytes(), "value".getBytes()));
}
}
private Map<TopicPartition, Long> writeCheckpoint() throws IOException {

View File

@ -62,6 +62,7 @@ public class InternalMockProcessorContext
extends AbstractProcessorContext
implements RecordCollector.Supplier {
private StateManager stateManager = new StateManagerStub();
private final File stateDir;
private final RecordCollector.Supplier recordCollectorSupplier;
private final Map<String, StateStore> storeMap = new LinkedHashMap<>();
@ -197,7 +198,11 @@ public class InternalMockProcessorContext
@Override
protected StateManager stateManager() {
return new StateManagerStub();
return stateManager;
}
public void setStateManger(final StateManager stateManger) {
this.stateManager = stateManger;
}
@Override
@ -245,6 +250,7 @@ public class InternalMockProcessorContext
final StateRestoreCallback func) {
storeMap.put(store.name(), store);
restoreFuncs.put(store.name(), func);
stateManager().registerStore(store, func);
}
@Override

View File

@ -78,6 +78,7 @@ public class NoOpReadOnlyStore<K, V> implements ReadOnlyKeyValueStore<K, V>, Sta
new File(context.stateDir() + File.separator + name).mkdir();
}
this.initialized = true;
context.register(root, (k, v) -> { });
}
@Override

View File

@ -43,7 +43,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.KeyValueStoreFacade;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.internals.WindowStoreFacade;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
@ -52,6 +51,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRegister;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
@ -300,7 +300,7 @@ public class TopologyTestDriver implements Closeable {
private TopologyTestDriver(final InternalTopologyBuilder builder,
final Properties config,
final long initialWallClockTimeMs) {
final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
logIfTaskIdleEnabled(streamsConfig);
logContext = new LogContext("topology-test-driver ");
@ -350,7 +350,7 @@ public class TopologyTestDriver implements Closeable {
logContext
);
setupGlobalTask(streamsConfig, streamsMetrics, cache);
setupGlobalTask(mockWallClockTime, streamsConfig, streamsMetrics, cache);
setupTask(streamsConfig, streamsMetrics, cache);
}
@ -407,7 +407,8 @@ public class TopologyTestDriver implements Closeable {
stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime, createStateDirectory);
}
private void setupGlobalTask(final StreamsConfig streamsConfig,
private void setupGlobalTask(final Time mockWallClockTime,
final StreamsConfig streamsConfig,
final StreamsMetricsImpl streamsMetrics,
final ThreadCache cache) {
if (globalTopology != null) {
@ -424,6 +425,7 @@ public class TopologyTestDriver implements Closeable {
globalStateManager = new GlobalStateManagerImpl(
logContext,
mockWallClockTime,
globalTopology,
globalConsumer,
stateDirectory,

View File

@ -27,9 +27,9 @@ import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.QuietStreamsConfig;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
@ -217,7 +217,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
*/
@SuppressWarnings({"WeakerAccess", "unused"})
public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) {
final StreamsConfig streamsConfig = new QuietStreamsConfig(config);
final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config);
this.taskId = taskId;
this.config = streamsConfig;
this.stateDir = stateDir;