KAFKA-10689: fix windowed FKJ topology and put checks in assignor to avoid infinite loops (#9568)

Fix infinite loop in assignor when trying to resolve the number of partitions in a topology with a windowed FKJ. Also adds a check to this loop to break out and fail the application if we detect that we are/will be stuck in an infinite loop

Reviewers: Matthias Sax <matthias@confluent.io>
This commit is contained in:
A. Sophie Blee-Goldman 2020-11-17 16:57:53 -08:00 committed by GitHub
parent 7a23e592f4
commit e71cb7ab11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 135 additions and 23 deletions

View File

@ -185,7 +185,7 @@
<!-- Streams tests --> <!-- Streams tests -->
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/> files="(StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>
<suppress checks="MethodLength" <suppress checks="MethodLength"
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/> files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>

View File

@ -51,18 +51,21 @@ public class StreamSinkNode<K, V> extends StreamsGraphNode {
} }
@Override @Override
@SuppressWarnings("unchecked")
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final Serializer<K> keySerializer = producedInternal.keySerde() == null ? null : producedInternal.keySerde().serializer(); final Serializer<K> keySerializer = producedInternal.keySerde() == null ? null : producedInternal.keySerde().serializer();
final Serializer<V> valSerializer = producedInternal.valueSerde() == null ? null : producedInternal.valueSerde().serializer(); final Serializer<V> valSerializer = producedInternal.valueSerde() == null ? null : producedInternal.valueSerde().serializer();
final StreamPartitioner<? super K, ? super V> partitioner = producedInternal.streamPartitioner();
final String[] parentNames = parentNodeNames(); final String[] parentNames = parentNodeNames();
if (partitioner == null && keySerializer instanceof WindowedSerializer) { final StreamPartitioner<? super K, ? super V> partitioner;
@SuppressWarnings("unchecked") if (producedInternal.streamPartitioner() == null && keySerializer instanceof WindowedSerializer) {
final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer); partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<K, V>((WindowedSerializer<K>) keySerializer);
topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, windowedPartitioner, parentNames); } else {
} else if (topicNameExtractor instanceof StaticTopicNameExtractor) { partitioner = producedInternal.streamPartitioner();
final String topicName = ((StaticTopicNameExtractor) topicNameExtractor).topicName; }
if (topicNameExtractor instanceof StaticTopicNameExtractor) {
final String topicName = ((StaticTopicNameExtractor<K, V>) topicNameExtractor).topicName;
topologyBuilder.addSink(nodeName(), topicName, keySerializer, valSerializer, partitioner, parentNames); topologyBuilder.addSink(nodeName(), topicName, keySerializer, valSerializer, partitioner, parentNames);
} else { } else {
topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner, parentNames); topologyBuilder.addSink(nodeName(), topicNameExtractor, keySerializer, valSerializer, partitioner, parentNames);

View File

@ -530,10 +530,11 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
boolean numPartitionsNeeded; boolean numPartitionsNeeded;
do { do {
numPartitionsNeeded = false; numPartitionsNeeded = false;
boolean progressMadeThisIteration = false; // avoid infinitely looping without making any progress on unknown repartitions
for (final TopicsInfo topicsInfo : topicGroups.values()) { for (final TopicsInfo topicsInfo : topicGroups.values()) {
for (final String topicName : topicsInfo.repartitionSourceTopics.keySet()) { for (final String repartitionSourceTopic : topicsInfo.repartitionSourceTopics.keySet()) {
final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(topicName) final Optional<Integer> maybeNumPartitions = repartitionTopicMetadata.get(repartitionSourceTopic)
.numberOfPartitions(); .numberOfPartitions();
Integer numPartitions = null; Integer numPartitions = null;
@ -542,24 +543,24 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
for (final TopicsInfo otherTopicsInfo : topicGroups.values()) { for (final TopicsInfo otherTopicsInfo : topicGroups.values()) {
final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics; final Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
if (otherSinkTopics.contains(topicName)) { if (otherSinkTopics.contains(repartitionSourceTopic)) {
// if this topic is one of the sink topics of this topology, // if this topic is one of the sink topics of this topology,
// use the maximum of all its source topic partitions as the number of partitions // use the maximum of all its source topic partitions as the number of partitions
for (final String sourceTopicName : otherTopicsInfo.sourceTopics) { for (final String upstreamSourceTopic : otherTopicsInfo.sourceTopics) {
Integer numPartitionsCandidate = null; Integer numPartitionsCandidate = null;
// It is possible the sourceTopic is another internal topic, i.e, // It is possible the sourceTopic is another internal topic, i.e,
// map().join().join(map()) // map().join().join(map())
if (repartitionTopicMetadata.containsKey(sourceTopicName)) { if (repartitionTopicMetadata.containsKey(upstreamSourceTopic)) {
if (repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().isPresent()) { if (repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().isPresent()) {
numPartitionsCandidate = numPartitionsCandidate =
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions().get(); repartitionTopicMetadata.get(upstreamSourceTopic).numberOfPartitions().get();
} }
} else { } else {
final Integer count = metadata.partitionCountForTopic(sourceTopicName); final Integer count = metadata.partitionCountForTopic(upstreamSourceTopic);
if (count == null) { if (count == null) {
throw new TaskAssignmentException( throw new TaskAssignmentException(
"No partition count found for source topic " "No partition count found for source topic "
+ sourceTopicName + upstreamSourceTopic
+ ", but it should have been." + ", but it should have been."
); );
} }
@ -575,16 +576,20 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
} }
} }
// if we still have not found the right number of partitions,
// another iteration is needed
if (numPartitions == null) { if (numPartitions == null) {
numPartitionsNeeded = true; numPartitionsNeeded = true;
log.trace("Unable to determine number of partitions for {}, another iteration is needed",
repartitionSourceTopic);
} else { } else {
repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions); repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
progressMadeThisIteration = true;
} }
} }
} }
} }
if (!progressMadeThisIteration && numPartitionsNeeded) {
throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics");
}
} while (numPartitionsNeeded); } while (numPartitionsNeeded);
} }

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.streams.integration; package org.apache.kafka.streams.integration;
import java.time.Duration;
import kafka.log.LogConfig; import kafka.log.LogConfig;
import kafka.utils.MockTime; import kafka.utils.MockTime;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
@ -33,7 +34,9 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@ -59,6 +62,8 @@ import java.util.concurrent.TimeUnit;
import static java.time.Duration.ofMillis; import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds; import static java.time.Duration.ofSeconds;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -73,6 +78,7 @@ public class InternalTopicIntegrationTest {
private static final String APP_ID = "internal-topics-integration-test"; private static final String APP_ID = "internal-topics-integration-test";
private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
private static final String DEFAULT_INPUT_TABLE_TOPIC = "inputTable";
private final MockTime mockTime = CLUSTER.time; private final MockTime mockTime = CLUSTER.time;
@ -80,7 +86,7 @@ public class InternalTopicIntegrationTest {
@BeforeClass @BeforeClass
public static void startKafkaCluster() throws InterruptedException { public static void startKafkaCluster() throws InterruptedException {
CLUSTER.createTopics(DEFAULT_INPUT_TOPIC); CLUSTER.createTopics(DEFAULT_INPUT_TOPIC, DEFAULT_INPUT_TABLE_TOPIC);
} }
@Before @Before
@ -135,6 +141,37 @@ public class InternalTopicIntegrationTest {
return Admin.create(adminClientConfig); return Admin.create(adminClientConfig);
} }
/*
* This test just ensures that that the assignor does not get stuck during partition number resolution
* for internal repartition topics. See KAFKA-10689
*/
@Test
public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception {
final String appID = APP_ID + "-windowed-FKJ";
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
final KTable<String, String> inputTable = streamsBuilder.table(DEFAULT_INPUT_TABLE_TOPIC);
inputTopic
.groupBy(
(k, v) -> k,
Grouped.with("GroupName", Serdes.String(), Serdes.String())
)
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.aggregate(
() -> "",
(k, v, a) -> a + k)
.leftJoin(
inputTable,
v -> v,
(x, y) -> x + y
);
final KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsProp);
startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(60));
}
@Test @Test
public void shouldCompactTopicsForKeyValueStoreChangelogs() { public void shouldCompactTopicsForKeyValueStoreChangelogs() {
final String appID = APP_ID + "-compact"; final String appID = APP_ID + "-compact";

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.kafka.streams.processor.internals; package org.apache.kafka.streams.processor.internals;
import java.time.Duration;
import java.util.Properties; import java.util.Properties;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClient;
@ -36,18 +37,24 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
@ -88,6 +95,7 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -1053,7 +1061,7 @@ public class StreamsPartitionAssignorTest {
EasyMock.verify(streamsMetadataState); EasyMock.verify(streamsMetadataState);
EasyMock.verify(taskManager); EasyMock.verify(taskManager);
assertEquals(Collections.singleton(t3p0.topic()), capturedCluster.getValue().topics()); assertEquals(singleton(t3p0.topic()), capturedCluster.getValue().topics());
assertEquals(2, capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size()); assertEquals(2, capturedCluster.getValue().partitionsForTopic(t3p0.topic()).size());
} }
@ -2015,6 +2023,65 @@ public class StreamsPartitionAssignorTest {
assertEquals(-128, partitionAssignor.uniqueField()); assertEquals(-128, partitionAssignor.uniqueField());
} }
@Test
public void shouldThrowTaskAssignmentExceptionWhenUnableToResolvePartitionCount() {
builder = new CorruptedInternalTopologyBuilder();
final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder);
final KStream<String, String> inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>());
final KTable<String, String> inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(), new MaterializedInternal<>(Materialized.as("store")));
inputTopic
.groupBy(
(k, v) -> k,
Grouped.with("GroupName", Serdes.String(), Serdes.String())
)
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
.aggregate(
() -> "",
(k, v, a) -> a + k)
.leftJoin(
inputTable,
v -> v,
(x, y) -> x + y
);
streamsBuilder.buildAndOptimizeTopology();
configureDefault();
subscriptions.put("consumer",
new Subscription(
singletonList("topic"),
defaultSubscriptionInfo.encode()
));
final Map<String, Assignment> assignments = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment();
assertThat(AssignmentInfo.decode(assignments.get("consumer").userData()).errCode(),
equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
}
private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder {
private Map<Integer, TopicsInfo> corruptedTopicGroups;
@Override
public synchronized Map<Integer, TopicsInfo> topicGroups() {
if (corruptedTopicGroups == null) {
corruptedTopicGroups = new HashMap<>();
for (final Map.Entry<Integer, TopicsInfo> topicGroupEntry : super.topicGroups().entrySet()) {
final TopicsInfo originalInfo = topicGroupEntry.getValue();
corruptedTopicGroups.put(
topicGroupEntry.getKey(),
new TopicsInfo(
emptySet(),
originalInfo.sourceTopics,
originalInfo.repartitionSourceTopics,
originalInfo.stateChangelogTopics
));
}
}
return corruptedTopicGroups;
}
}
private static ByteBuffer encodeFutureSubscription() { private static ByteBuffer encodeFutureSubscription() {
final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */); final ByteBuffer buf = ByteBuffer.allocate(4 /* used version */ + 4 /* supported version */);
buf.putInt(LATEST_SUPPORTED_VERSION + 1); buf.putInt(LATEST_SUPPORTED_VERSION + 1);