diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 5656c914f7b..798f0b089a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -171,26 +171,6 @@ public abstract class AbstractTask implements Task { return sb.toString(); } - protected void updateOffsetLimits() { - for (final TopicPartition partition : partitions) { - try { - final OffsetAndMetadata metadata = consumer.committed(partition); // TODO: batch API? - final long offset = metadata != null ? metadata.offset() : 0L; - stateMgr.putOffsetLimit(partition, offset); - - if (log.isTraceEnabled()) { - log.trace("Updating store offset limits {} for changelog {}", offset, partition); - } - } catch (final AuthorizationException e) { - throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partition), e); - } catch (final WakeupException e) { - throw e; - } catch (final KafkaException e) { - throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", id, partition), e); - } - } - } - /** * Flush all state stores owned by this task */ @@ -219,9 +199,6 @@ public abstract class AbstractTask implements Task { } log.trace("Initializing state stores"); - // set initial offset limits - updateOffsetLimits(); - for (final StateStore store : topology.stateStores()) { log.trace("Initializing store {}", store.name()); processorContext.uninitialize(); @@ -272,4 +249,18 @@ public abstract class AbstractTask implements Task { public Collection changelogPartitions() { return stateMgr.changelogPartitions(); } + + long committedOffsetForPartition(final TopicPartition partition) { + try { + final OffsetAndMetadata metadata = consumer.committed(partition); + return metadata != null ? metadata.offset() : 0L; + } catch (final AuthorizationException e) { + throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partition), e); + } catch (final WakeupException e) { + throw e; + } catch (final KafkaException e) { + throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", id, partition), e); + } + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java index a99e45147b9..6025e885a13 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java @@ -24,4 +24,14 @@ class AssignedStandbyTasks extends AssignedTasks { super(logContext, "standby task"); } + @Override + int commit() { + final int committed = super.commit(); + // TODO: this contortion would not be necessary if we got rid of the two-step + // task.commitNeeded and task.commit and instead just had task.commitIfNeeded. Currently + // we only call commit if commitNeeded is true, which means that we need a way to indicate + // that we are eligible for updating the offset limit outside of commit. + running.forEach((id, task) -> task.allowUpdateOfOffsetLimit()); + return committed; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 424ded4381c..c758ccd2e7b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -16,6 +16,14 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; @@ -25,20 +33,14 @@ import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - /** * A StandbyTask */ public class StandbyTask extends AbstractTask { - private Map checkpointedOffsets = new HashMap<>(); private final Sensor closeTaskSensor; + private final Map offsetLimits = new HashMap<>(); + private final Set updateableOffsetLimits = new HashSet<>(); /** * Create {@link StandbyTask} with its assigned partitions @@ -63,6 +65,14 @@ public class StandbyTask extends AbstractTask { closeTaskSensor = metrics.threadLevelSensor("task-closed", Sensor.RecordingLevel.INFO); processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); + + final Set changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values()); + partitions.stream() + .filter(tp -> changelogTopicNames.contains(tp.topic())) + .forEach(tp -> { + offsetLimits.put(tp, 0L); + updateableOffsetLimits.add(tp); + }); } @Override @@ -88,7 +98,7 @@ public class StandbyTask extends AbstractTask { @Override public void resume() { log.debug("Resuming"); - updateOffsetLimits(); + allowUpdateOfOffsetLimit(); } /** @@ -102,9 +112,7 @@ public class StandbyTask extends AbstractTask { public void commit() { log.trace("Committing"); flushAndCheckpointState(); - // reinitialize offset limits - updateOffsetLimits(); - + allowUpdateOfOffsetLimit(); commitNeeded = false; } @@ -165,14 +173,25 @@ public class StandbyTask extends AbstractTask { */ public List> update(final TopicPartition partition, final List> records) { + if (records.isEmpty()) { + return Collections.emptyList(); + } + log.trace("Updating standby replicas of its state store for partition [{}]", partition); - final long limit = stateMgr.offsetLimit(partition); + long limit = offsetLimits.getOrDefault(partition, Long.MAX_VALUE); long lastOffset = -1L; final List> restoreRecords = new ArrayList<>(records.size()); final List> remainingRecords = new ArrayList<>(); for (final ConsumerRecord record : records) { + // Check if we're unable to process records due to an offset limit (e.g. when our + // partition is both a source and a changelog). If we're limited then try to refresh + // the offset limit if possible. + if (record.offset() >= limit && updateableOffsetLimits.contains(partition)) { + limit = updateOffsetLimits(partition); + } + if (record.offset() < limit) { restoreRecords.add(record); lastOffset = record.offset(); @@ -181,9 +200,8 @@ public class StandbyTask extends AbstractTask { } } - stateMgr.updateStandbyStates(partition, restoreRecords, lastOffset); - if (!restoreRecords.isEmpty()) { + stateMgr.updateStandbyStates(partition, restoreRecords, lastOffset); commitNeeded = true; } @@ -194,4 +212,23 @@ public class StandbyTask extends AbstractTask { return checkpointedOffsets; } + private long updateOffsetLimits(final TopicPartition partition) { + if (!offsetLimits.containsKey(partition)) { + throw new IllegalArgumentException("Topic is not both a source and a changelog: " + partition); + } + + updateableOffsetLimits.remove(partition); + + final long newLimit = committedOffsetForPartition(partition); + final long previousLimit = offsetLimits.put(partition, newLimit); + if (previousLimit > newLimit) { + throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " + + "New limit: " + newLimit + ". Previous limit: " + previousLimit); + } + return newLimit; + } + + void allowUpdateOfOffsetLimit() { + updateableOffsetLimits.addAll(offsetLimits.keySet()); + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index c8a9842ff9a..adb886969ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Map.Entry; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -44,7 +45,7 @@ public class StoreChangelogReader implements ChangelogReader { private final Logger log; private final Consumer restoreConsumer; private final StateRestoreListener userStateRestoreListener; - private final Map endOffsets = new HashMap<>(); + private final Map restoreToOffsets = new HashMap<>(); private final Map> partitionInfo = new HashMap<>(); private final Map stateRestorers = new HashMap<>(); private final Set needsRestoring = new HashSet<>(); @@ -89,11 +90,11 @@ public class StoreChangelogReader implements ChangelogReader { for (final TopicPartition partition : needsRestoring) { final StateRestorer restorer = stateRestorers.get(partition); - final long pos = processNext(records.records(partition), restorer, endOffsets.get(partition)); + final long pos = processNext(records.records(partition), restorer, restoreToOffsets.get(partition)); restorer.setRestoredOffset(pos); - if (restorer.hasCompleted(pos, endOffsets.get(partition))) { + if (restorer.hasCompleted(pos, restoreToOffsets.get(partition))) { restorer.restoreDone(); - endOffsets.remove(partition); + restoreToOffsets.remove(partition); completedRestorers.add(partition); } } @@ -141,39 +142,44 @@ public class StoreChangelogReader implements ChangelogReader { // try to fetch end offsets for the initializable restorers and remove any partitions // where we already have all of the data + final Map endOffsets; try { - endOffsets.putAll(restoreConsumer.endOffsets(initializable)); + endOffsets = restoreConsumer.endOffsets(initializable); } catch (final TimeoutException e) { // if timeout exception gets thrown we just give up this time and retry in the next run loop log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", initializable); return; } + endOffsets.forEach((partition, endOffset) -> { + if (endOffset != null) { + final StateRestorer restorer = stateRestorers.get(partition); + final long offsetLimit = restorer.offsetLimit(); + restoreToOffsets.put(partition, Math.min(endOffset, offsetLimit)); + } else { + log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop"); + initializable.remove(partition); + } + }); + final Iterator iter = initializable.iterator(); while (iter.hasNext()) { final TopicPartition topicPartition = iter.next(); - final Long endOffset = endOffsets.get(topicPartition); + final Long restoreOffset = restoreToOffsets.get(topicPartition); + final StateRestorer restorer = stateRestorers.get(topicPartition); - // offset should not be null; but since the consumer API does not guarantee it - // we add this check just in case - if (endOffset != null) { - final StateRestorer restorer = stateRestorers.get(topicPartition); - if (restorer.checkpoint() >= endOffset) { - restorer.setRestoredOffset(restorer.checkpoint()); - iter.remove(); - completedRestorers.add(topicPartition); - } else if (restorer.offsetLimit() == 0 || endOffset == 0) { - restorer.setRestoredOffset(0); - iter.remove(); - completedRestorers.add(topicPartition); - } else { - restorer.setEndingOffset(endOffset); - } - needsInitializing.remove(topicPartition); - } else { - log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop"); + if (restorer.checkpoint() >= restoreOffset) { + restorer.setRestoredOffset(restorer.checkpoint()); iter.remove(); + completedRestorers.add(topicPartition); + } else if (restoreOffset == 0) { + restorer.setRestoredOffset(0); + iter.remove(); + completedRestorers.add(topicPartition); + } else { + restorer.setEndingOffset(restoreOffset); } + needsInitializing.remove(topicPartition); } // set up restorer for those initializable @@ -200,7 +206,7 @@ public class StoreChangelogReader implements ChangelogReader { restoreConsumer.seek(partition, restorer.checkpoint()); logRestoreOffsets(partition, restorer.checkpoint(), - endOffsets.get(partition)); + restoreToOffsets.get(partition)); restorer.setStartingOffset(restoreConsumer.position(partition)); restorer.restoreStarted(); } else { @@ -232,7 +238,7 @@ public class StoreChangelogReader implements ChangelogReader { final long position = restoreConsumer.position(restorer.partition()); logRestoreOffsets(restorer.partition(), position, - endOffsets.get(restorer.partition())); + restoreToOffsets.get(restorer.partition())); restorer.setStartingOffset(position); restorer.restoreStarted(); } @@ -279,7 +285,7 @@ public class StoreChangelogReader implements ChangelogReader { partitionInfo.clear(); stateRestorers.clear(); needsRestoring.clear(); - endOffsets.clear(); + restoreToOffsets.clear(); needsInitializing.clear(); completedRestorers.clear(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d6ee2dff735..3dd83b5b00c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -16,6 +16,19 @@ */ package org.apache.kafka.streams.processor.internals; +import static java.lang.String.format; +import static java.util.Collections.singleton; +import static org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordLatenessSensor; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -48,18 +61,6 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; import org.apache.kafka.streams.state.internals.ThreadCache; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static java.lang.String.format; -import static java.util.Collections.singleton; -import static org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordLatenessSensor; - /** * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing. */ @@ -236,6 +237,22 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator @Override public boolean initializeStateStores() { log.trace("Initializing state stores"); + + // Currently there is no easy way to tell the ProcessorStateManager to only restore up to + // a specific offset. In most cases this is fine. However, in optimized topologies we can + // have a source topic that also serves as a changelog, and in this case we want our active + // stream task to only play records up to the last consumer committed offset. Here we find + // partitions of topics that are both sources and changelogs and set the consumer committed + // offset via stateMgr as there is not a more direct route. + final Set changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values()); + partitions.stream() + .filter(tp -> changelogTopicNames.contains(tp.topic())) + .forEach(tp -> { + final long offset = committedOffsetForPartition(tp); + stateMgr.putOffsetLimit(tp, offset); + log.trace("Updating store offset limits {} for changelog {}", offset, tp); + }); + registerStateStores(); return changelogPartitions().isEmpty(); @@ -460,7 +477,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator final TopicPartition partition = entry.getKey(); final long offset = entry.getValue() + 1; consumedOffsetsAndMetadata.put(partition, new OffsetAndMetadata(offset)); - stateMgr.putOffsetLimit(partition, offset); } try { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java new file mode 100644 index 00000000000..22083c117ab --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(IntegrationTest.class) +public class OptimizedKTableIntegrationTest { + private static final int NUM_BROKERS = 1; + + private static final String INPUT_TOPIC_NAME = "input-topic"; + private static final String TABLE_NAME = "source-table"; + + @Rule + public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS); + + private final Map kafkaStreamsStates = new HashMap<>(); + private final Lock kafkaStreamsStatesLock = new ReentrantLock(); + private final Condition kafkaStreamsStateUpdate = kafkaStreamsStatesLock.newCondition(); + private final MockTime mockTime = cluster.time; + + @Before + public void before() throws InterruptedException { + cluster.createTopic(INPUT_TOPIC_NAME, 2, 1); + } + + @After + public void after() { + for (final KafkaStreams kafkaStreams : kafkaStreamsStates.keySet()) { + kafkaStreams.close(); + } + } + + @Test + public void standbyShouldNotPerformRestoreAtStartup() throws Exception { + final int numMessages = 10; + final int key = 1; + final Semaphore semaphore = new Semaphore(0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder + .table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); + + final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); + final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); + final List kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); + + produceValueRange(key, 0, 10); + + final AtomicLong restoreStartOffset = new AtomicLong(-1); + kafkaStreamsList.forEach(kafkaStreams -> { + kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset, new AtomicLong())); + kafkaStreams.start(); + }); + waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60, TimeUnit.SECONDS); + + // Assert that all messages in the first batch were processed in a timely manner + assertThat(semaphore.tryAcquire(numMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); + + // Assert that no restore occurred + assertThat(restoreStartOffset.get(), is(equalTo(-1L))); + } + + @Test + public void shouldApplyUpdatesToStandbyStore() throws Exception { + final int batch1NumMessages = 100; + final int batch2NumMessages = 100; + final int key = 1; + final Semaphore semaphore = new Semaphore(0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder + .table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), + Materialized.>as(TABLE_NAME) + .withCachingDisabled()) + .toStream() + .peek((k, v) -> semaphore.release()); + + final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); + final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); + final List kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); + + final AtomicLong restoreStartOffset = new AtomicLong(-1L); + final AtomicLong restoreEndOffset = new AtomicLong(-1L); + kafkaStreamsList.forEach(kafkaStreams -> { + kafkaStreams.setGlobalStateRestoreListener(createTrackingRestoreListener(restoreStartOffset, restoreEndOffset)); + kafkaStreams.start(); + }); + waitForKafkaStreamssToEnterRunningState(kafkaStreamsList, 60, TimeUnit.SECONDS); + + produceValueRange(key, 0, batch1NumMessages); + + // Assert that all messages in the first batch were processed in a timely manner + assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); + + final ReadOnlyKeyValueStore store1 = kafkaStreams1 + .store(TABLE_NAME, QueryableStoreTypes.keyValueStore()); + + final ReadOnlyKeyValueStore store2 = kafkaStreams2 + .store(TABLE_NAME, QueryableStoreTypes.keyValueStore()); + + final boolean kafkaStreams1WasFirstActive; + if (store1.get(key) != null) { + kafkaStreams1WasFirstActive = true; + } else { + // Assert that data from the job was sent to the store + assertThat(store2.get(key), is(notNullValue())); + kafkaStreams1WasFirstActive = false; + } + + // Assert that no restore has occurred, ensures that when we check later that the restore + // notification actually came from after the rebalance. + assertThat(restoreStartOffset.get(), is(equalTo(-1L))); + + // Assert that the current value in store reflects all messages being processed + assertThat(kafkaStreams1WasFirstActive ? store1.get(key) : store2.get(key), is(equalTo(batch1NumMessages - 1))); + + if (kafkaStreams1WasFirstActive) { + kafkaStreams1.close(); + } else { + kafkaStreams2.close(); + } + + final ReadOnlyKeyValueStore newActiveStore = + kafkaStreams1WasFirstActive ? store2 : store1; + retryOnExceptionWithTimeout(100, 60 * 1000, TimeUnit.MILLISECONDS, () -> { + // Assert that after failover we have recovered to the last store write + assertThat(newActiveStore.get(key), is(equalTo(batch1NumMessages - 1))); + }); + + final int totalNumMessages = batch1NumMessages + batch2NumMessages; + + produceValueRange(key, batch1NumMessages, totalNumMessages); + + // Assert that all messages in the second batch were processed in a timely manner + assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); + + // Assert that either restore was unnecessary or we restored from an offset later than 0 + assertThat(restoreStartOffset.get(), is(anyOf(greaterThan(0L), equalTo(-1L)))); + + // Assert that either restore was unnecessary or we restored to the last offset before we closed the kafkaStreams + assertThat(restoreEndOffset.get(), is(anyOf(equalTo(batch1NumMessages - 1), equalTo(-1L)))); + + // Assert that the current value in store reflects all messages being processed + assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1))); + } + + private void produceValueRange(final int key, final int start, final int endExclusive) throws Exception { + final Properties producerProps = new Properties(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + + IntegrationTestUtils.produceKeyValuesSynchronously( + INPUT_TOPIC_NAME, + IntStream.range(start, endExclusive) + .mapToObj(i -> KeyValue.pair(key, i)) + .collect(Collectors.toList()), + producerProps, + mockTime); + } + + private void retryOnExceptionWithTimeout(final long pollInterval, + final long timeout, + final TimeUnit timeUnit, + final Runnable runnable) throws InterruptedException { + final long expectedEnd = System.currentTimeMillis() + timeUnit.toMillis(timeout); + + while (true) { + try { + runnable.run(); + return; + } catch (final Throwable t) { + if (expectedEnd <= System.currentTimeMillis()) { + throw new AssertionError(t); + } + Thread.sleep(timeUnit.toMillis(pollInterval)); + } + } + } + + private void waitForKafkaStreamssToEnterRunningState(final Collection kafkaStreamss, + final long time, + final TimeUnit timeUnit) throws InterruptedException { + + final long expectedEnd = System.currentTimeMillis() + timeUnit.toMillis(time); + + kafkaStreamsStatesLock.lock(); + try { + while (!kafkaStreamss.stream().allMatch(kafkaStreams -> kafkaStreamsStates.get(kafkaStreams) == State.RUNNING)) { + if (expectedEnd <= System.currentTimeMillis()) { + fail("one or more kafkaStreamss did not enter RUNNING in a timely manner"); + } + final long millisRemaining = Math.max(1, expectedEnd - System.currentTimeMillis()); + kafkaStreamsStateUpdate.await(millisRemaining, TimeUnit.MILLISECONDS); + } + } finally { + kafkaStreamsStatesLock.unlock(); + } + } + + private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Properties config) { + final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(config), config); + kafkaStreamsStatesLock.lock(); + try { + kafkaStreamsStates.put(kafkaStreams, kafkaStreams.state()); + } finally { + kafkaStreamsStatesLock.unlock(); + } + + kafkaStreams.setStateListener((newState, oldState) -> { + kafkaStreamsStatesLock.lock(); + try { + kafkaStreamsStates.put(kafkaStreams, newState); + if (newState == State.RUNNING) { + if (kafkaStreamsStates.values().stream().allMatch(state -> state == State.RUNNING)) { + kafkaStreamsStateUpdate.signalAll(); + } + } + } finally { + kafkaStreamsStatesLock.unlock(); + } + }); + return kafkaStreams; + } + + private StateRestoreListener createTrackingRestoreListener(final AtomicLong restoreStartOffset, + final AtomicLong restoreEndOffset) { + return new StateRestoreListener() { + @Override + public void onRestoreStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long endingOffset) { + restoreStartOffset.set(startingOffset); + restoreEndOffset.set(endingOffset); + } + + @Override + public void onBatchRestored(final TopicPartition topicPartition, final String storeName, + final long batchEndOffset, final long numRestored) { + + } + + @Override + public void onRestoreEnd(final TopicPartition topicPartition, final String storeName, + final long totalRestored) { + + } + }; + } + + private Properties streamsConfiguration() { + final String applicationId = "streamsApp"; + final Properties config = new Properties(); + config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); + config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); + config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + return config; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index c5080d78031..0a79e8459af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -16,29 +16,11 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.AuthorizationException; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.LockException; -import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.test.InternalMockProcessorContext; -import org.apache.kafka.test.MockRestoreCallback; -import org.apache.kafka.test.MockStateRestoreListener; -import org.apache.kafka.test.TestUtils; -import org.easymock.EasyMock; -import org.junit.Before; -import org.junit.Test; +import static org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories.withLocalStores; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -49,12 +31,22 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; - -import static org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories.withLocalStores; -import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.LockException; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRestoreCallback; +import org.apache.kafka.test.MockStateRestoreListener; +import org.apache.kafka.test.TestUtils; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; public class AbstractTaskTest { @@ -72,27 +64,6 @@ public class AbstractTaskTest { expect(stateDirectory.directoryForTask(id)).andReturn(TestUtils.tempDirectory()); } - @Test(expected = ProcessorStateException.class) - public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() { - final Consumer consumer = mockConsumer(new AuthorizationException("blah")); - final AbstractTask task = createTask(consumer, Collections.emptyMap()); - task.updateOffsetLimits(); - } - - @Test(expected = ProcessorStateException.class) - public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() { - final Consumer consumer = mockConsumer(new KafkaException("blah")); - final AbstractTask task = createTask(consumer, Collections.emptyMap()); - task.updateOffsetLimits(); - } - - @Test(expected = WakeupException.class) - public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() { - final Consumer consumer = mockConsumer(new WakeupException()); - final AbstractTask task = createTask(consumer, Collections.emptyMap()); - task.updateOffsetLimits(); - } - @Test public void shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasStores() throws IOException { final Consumer consumer = EasyMock.createNiceMock(Consumer.class); @@ -270,14 +241,4 @@ public class AbstractTaskTest { public void initializeTopology() {} }; } - - private Consumer mockConsumer(final RuntimeException toThrow) { - return new MockConsumer(OffsetResetStrategy.EARLIEST) { - @Override - public OffsetAndMetadata committed(final TopicPartition partition) { - throw toThrow; - } - }; - } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 2faa078d675..df889798a86 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -554,6 +556,94 @@ public class StandbyTaskTest { ); } + @Test + public void shouldNotGetConsumerCommittedOffsetIfThereAreNoRecordUpdates() throws IOException { + final AtomicInteger committedCallCount = new AtomicInteger(); + + final Consumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + @Override + public synchronized OffsetAndMetadata committed(final TopicPartition partition) { + committedCallCount.getAndIncrement(); + return super.committed(partition); + } + }; + + consumer.assign(Collections.singletonList(globalTopicPartition)); + consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(0L)))); + + task = new StandbyTask( + taskId, + ktablePartitions, + ktableTopology, + consumer, + changelogReader, + createConfig(baseDir), + streamsMetrics, + stateDirectory + ); + task.initializeStateStores(); + assertThat(committedCallCount.get(), equalTo(0)); + + task.update(globalTopicPartition, Collections.emptyList()); + // We should not make a consumer.committed() call because there are no new records. + assertThat(committedCallCount.get(), equalTo(0)); + } + + @Test + public void shouldGetConsumerCommittedOffsetsOncePerCommit() throws IOException { + final AtomicInteger committedCallCount = new AtomicInteger(); + + final Consumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { + @Override + public synchronized OffsetAndMetadata committed(final TopicPartition partition) { + committedCallCount.getAndIncrement(); + return super.committed(partition); + } + }; + + consumer.assign(Collections.singletonList(globalTopicPartition)); + consumer.commitSync(mkMap(mkEntry(globalTopicPartition, new OffsetAndMetadata(0L)))); + + task = new StandbyTask( + taskId, + ktablePartitions, + ktableTopology, + consumer, + changelogReader, + createConfig(baseDir), + streamsMetrics, + stateDirectory + ); + task.initializeStateStores(); + + task.update( + globalTopicPartition, + Collections.singletonList( + makeConsumerRecord(globalTopicPartition, 1, 1) + ) + ); + assertThat(committedCallCount.get(), equalTo(1)); + + task.update( + globalTopicPartition, + Collections.singletonList( + makeConsumerRecord(globalTopicPartition, 1, 1) + ) + ); + // We should not make another consumer.committed() call until we commit + assertThat(committedCallCount.get(), equalTo(1)); + + task.commit(); + task.update( + globalTopicPartition, + Collections.singletonList( + makeConsumerRecord(globalTopicPartition, 1, 1) + ) + ); + // We committed so we're allowed to make another consumer.committed() call + assertThat(committedCallCount.get(), equalTo(2)); + } + @Test public void shouldInitializeStateStoreWithoutException() throws IOException { final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index bf59c21f12c..9ae9d798cb8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -750,6 +750,64 @@ public class StoreChangelogReaderTest { assertThat(callback.restored.size(), equalTo(10)); } + @Test + public void shouldRestoreUpToOffsetLimit() { + setupConsumer(10, topicPartition); + changelogReader.register(new StateRestorer( + topicPartition, + restoreListener, + 2L, + 5, + true, + "storeName1", + identity())); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); + changelogReader.restore(active); + + assertThat(callback.restored.size(), equalTo(3)); + assertAllCallbackStatesExecuted(callback, "storeName1"); + assertCorrectOffsetsReportedByListener(callback, 2L, 4L, 3L); + } + + @Test + public void shouldNotRestoreIfCheckpointIsEqualToOffsetLimit() { + setupConsumer(10, topicPartition); + changelogReader.register(new StateRestorer( + topicPartition, + restoreListener, + 5L, + 5, + true, + "storeName1", + identity())); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); + changelogReader.restore(active); + + assertThat(callback.storeNameCalledStates.size(), equalTo(0)); + assertThat(callback.restored.size(), equalTo(0)); + } + + @Test + public void shouldNotRestoreIfCheckpointIsGreaterThanOffsetLimit() { + setupConsumer(10, topicPartition); + changelogReader.register(new StateRestorer( + topicPartition, + restoreListener, + 10L, + 5, + true, + "storeName1", + identity())); + expect(active.restoringTaskFor(topicPartition)).andStubReturn(task); + replay(active, task); + changelogReader.restore(active); + + assertThat(callback.storeNameCalledStates.size(), equalTo(0)); + assertThat(callback.restored.size(), equalTo(0)); + } + private void setupConsumer(final long messages, final TopicPartition topicPartition) { assignPartition(messages, topicPartition); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index fb196fdcda3..cc0af23eed3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -16,15 +16,19 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; @@ -40,6 +44,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.PunctuationType; @@ -77,6 +82,7 @@ import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; @@ -98,7 +104,7 @@ public class StreamTaskTest { private final String topic2 = "topic2"; private final TopicPartition partition1 = new TopicPartition(topic1, 1); private final TopicPartition partition2 = new TopicPartition(topic2, 1); - private final Set partitions = Utils.mkSet(partition1, partition2); + private final Set partitions = mkSet(partition1, partition2); private final MockSourceNode source1 = new MockSourceNode<>(new String[]{topic1}, intDeserializer, intDeserializer); private final MockSourceNode source2 = new MockSourceNode<>(new String[]{topic2}, intDeserializer, intDeserializer); @@ -1417,7 +1423,7 @@ public class StreamTaskTest { task = new StreamTask( taskId00, - Utils.mkSet(partition1, repartition), + mkSet(partition1, repartition), topology, consumer, changelogReader, @@ -1478,6 +1484,59 @@ public class StreamTaskTest { assertEquals(1, producer.history().size()); } + @Test(expected = ProcessorStateException.class) + public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() { + final Consumer consumer = mockConsumerWithCommittedException(new AuthorizationException("message")); + final StreamTask task = createOptimizedStatefulTask(createConfig(false), consumer); + task.initializeStateStores(); + } + + @Test(expected = ProcessorStateException.class) + public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() { + final Consumer consumer = mockConsumerWithCommittedException(new KafkaException("message")); + final AbstractTask task = createOptimizedStatefulTask(createConfig(false), consumer); + task.initializeStateStores(); + } + + @Test(expected = WakeupException.class) + public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() { + final Consumer consumer = mockConsumerWithCommittedException(new WakeupException()); + final AbstractTask task = createOptimizedStatefulTask(createConfig(false), consumer); + task.initializeStateStores(); + } + + private Consumer mockConsumerWithCommittedException(final RuntimeException toThrow) { + return new MockConsumer(OffsetResetStrategy.EARLIEST) { + @Override + public OffsetAndMetadata committed(final TopicPartition partition) { + throw toThrow; + } + }; + } + + private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final Consumer consumer) { + final StateStore stateStore = new MockKeyValueStore(storeName, true); + + final ProcessorTopology topology = ProcessorTopologyFactories.with( + asList(source1), + mkMap(mkEntry(topic1, source1)), + singletonList(stateStore), + Collections.singletonMap(storeName, topic1)); + + return new StreamTask( + taskId00, + mkSet(partition1), + topology, + consumer, + changelogReader, + config, + streamsMetrics, + stateDirectory, + null, + time, + () -> producer = new MockProducer<>(false, bytesSerializer, bytesSerializer)); + } + private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) { final StateStore stateStore = new MockKeyValueStore(storeName, logged); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index f48c31c1d91..a2e4557c27e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -317,10 +317,7 @@ public class StreamThreadStateStoreProviderTest { stateDirectory, null, new MockTime(), - () -> clientSupplier.getProducer(new HashMap<>())) { - @Override - protected void updateOffsetLimits() {} - }; + () -> clientSupplier.getProducer(new HashMap<>())); } private void mockThread(final boolean initialized) {