mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-8602: Backport bugfix for standby task creation (#7146)
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
		
							parent
							
								
									57352bb163
								
							
						
					
					
						commit
						79eaddd25f
					
				|  | @ -487,7 +487,7 @@ public class StreamThread extends Thread { | |||
| 
 | ||||
|             final ProcessorTopology topology = builder.build(taskId.topicGroupId); | ||||
| 
 | ||||
|             if (!topology.stateStores().isEmpty()) { | ||||
|             if (!topology.stateStores().isEmpty() && !topology.storeToChangelogTopic().isEmpty()) { | ||||
|                 return new StandbyTask( | ||||
|                     taskId, | ||||
|                     partitions, | ||||
|  |  | |||
|  | @ -0,0 +1,159 @@ | |||
| /* | ||||
|  * Licensed to the Apache Software Foundation (ASF) under one or more | ||||
|  * contributor license agreements. See the NOTICE file distributed with | ||||
|  * this work for additional information regarding copyright ownership. | ||||
|  * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
|  * (the "License"); you may not use this file except in compliance with | ||||
|  * the License. You may obtain a copy of the License at | ||||
|  * | ||||
|  *    http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package org.apache.kafka.streams.integration; | ||||
| 
 | ||||
| import org.apache.kafka.common.serialization.Serdes; | ||||
| import org.apache.kafka.streams.KafkaStreams; | ||||
| import org.apache.kafka.streams.KafkaStreams.State; | ||||
| import org.apache.kafka.streams.KeyValue; | ||||
| import org.apache.kafka.streams.StreamsBuilder; | ||||
| import org.apache.kafka.streams.StreamsConfig; | ||||
| import org.apache.kafka.streams.Topology; | ||||
| import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; | ||||
| import org.apache.kafka.streams.kstream.Consumed; | ||||
| import org.apache.kafka.streams.kstream.Transformer; | ||||
| import org.apache.kafka.streams.processor.ProcessorContext; | ||||
| import org.apache.kafka.streams.processor.ThreadMetadata; | ||||
| import org.apache.kafka.streams.state.KeyValueStore; | ||||
| import org.apache.kafka.streams.state.StoreBuilder; | ||||
| import org.apache.kafka.streams.state.Stores; | ||||
| import org.apache.kafka.test.IntegrationTest; | ||||
| import org.apache.kafka.test.TestUtils; | ||||
| import org.junit.After; | ||||
| import org.junit.BeforeClass; | ||||
| import org.junit.ClassRule; | ||||
| import org.junit.Test; | ||||
| import org.junit.experimental.categories.Category; | ||||
| 
 | ||||
| import java.util.Properties; | ||||
| import java.util.function.Predicate; | ||||
| 
 | ||||
| @Category({IntegrationTest.class}) | ||||
| public class StandbyTaskCreationIntegrationTest { | ||||
| 
 | ||||
|     private static final int NUM_BROKERS = 1; | ||||
| 
 | ||||
|     @ClassRule | ||||
|     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); | ||||
| 
 | ||||
|     private static final String INPUT_TOPIC = "input-topic"; | ||||
| 
 | ||||
|     private KafkaStreams client1; | ||||
|     private KafkaStreams client2; | ||||
|     private volatile boolean client1IsOk = false; | ||||
|     private volatile boolean client2IsOk = false; | ||||
| 
 | ||||
|     @BeforeClass | ||||
|     public static void createTopics() throws InterruptedException { | ||||
|         CLUSTER.createTopic(INPUT_TOPIC, 2, 1); | ||||
|     } | ||||
| 
 | ||||
|     @After | ||||
|     public void after() { | ||||
|         client1.close(); | ||||
|         client2.close(); | ||||
|     } | ||||
| 
 | ||||
|     private Properties streamsConfiguration() { | ||||
|         final String applicationId = "testApp"; | ||||
|         final Properties streamsConfiguration = new Properties(); | ||||
|         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); | ||||
|         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); | ||||
|         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); | ||||
|         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); | ||||
|         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); | ||||
|         streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); | ||||
|         return streamsConfiguration; | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws Exception { | ||||
|         final StreamsBuilder builder = new StreamsBuilder(); | ||||
|         final String stateStoreName = "myTransformState"; | ||||
|         final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder = | ||||
|             Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), | ||||
|                                         Serdes.Integer(), | ||||
|                                         Serdes.Integer()).withLoggingDisabled(); | ||||
|         builder.addStateStore(keyValueStoreBuilder); | ||||
|         builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) | ||||
|             .transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() { | ||||
|                 @SuppressWarnings("unchecked") | ||||
|                 @Override | ||||
|                 public void init(final ProcessorContext context) {} | ||||
| 
 | ||||
|                 @Override | ||||
|                 public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) { | ||||
|                     return null; | ||||
|                 } | ||||
| 
 | ||||
|                 @Override | ||||
|                 public void close() {} | ||||
|             }, stateStoreName); | ||||
| 
 | ||||
|         final Topology topology = builder.build(); | ||||
|         createClients(topology, streamsConfiguration(), topology, streamsConfiguration()); | ||||
| 
 | ||||
|         setStateListenersForVerification(thread -> thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty()); | ||||
| 
 | ||||
|         startClients(); | ||||
| 
 | ||||
|         waitUntilBothClientAreOK( | ||||
|             "At least one client did not reach state RUNNING with active tasks but no stand-by tasks" | ||||
|         ); | ||||
|     } | ||||
| 
 | ||||
|     private void createClients(final Topology topology1, | ||||
|                                final Properties streamsConfiguration1, | ||||
|                                final Topology topology2, | ||||
|                                final Properties streamsConfiguration2) { | ||||
| 
 | ||||
|         client1 = new KafkaStreams(topology1, streamsConfiguration1); | ||||
|         client2 = new KafkaStreams(topology2, streamsConfiguration2); | ||||
|     } | ||||
| 
 | ||||
|     private void setStateListenersForVerification(final Predicate<ThreadMetadata> taskCondition) { | ||||
|         client1.setStateListener((newState, oldState) -> { | ||||
|             if (newState == State.RUNNING && | ||||
|                 client1.localThreadsMetadata().stream().allMatch(taskCondition)) { | ||||
| 
 | ||||
|                 client1IsOk = true; | ||||
|             } | ||||
|         }); | ||||
|         client2.setStateListener((newState, oldState) -> { | ||||
|             if (newState == State.RUNNING && | ||||
|                 client2.localThreadsMetadata().stream().allMatch(taskCondition)) { | ||||
| 
 | ||||
|                 client2IsOk = true; | ||||
|             } | ||||
|         }); | ||||
|     } | ||||
| 
 | ||||
|     private void startClients() { | ||||
|         client1.start(); | ||||
|         client2.start(); | ||||
|     } | ||||
| 
 | ||||
|     private void waitUntilBothClientAreOK(final String message) throws Exception { | ||||
|         TestUtils.waitForCondition( | ||||
|             () -> client1IsOk && client2IsOk, | ||||
|             30 * 1000, | ||||
|             message + ": " | ||||
|                 + "Client 1 is " + (!client1IsOk ? "NOT " : "") + "OK, " | ||||
|                 + "client 2 is " + (!client2IsOk ? "NOT " : "") + "OK." | ||||
|         ); | ||||
|     } | ||||
| } | ||||
|  | @ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; | |||
| import org.apache.kafka.clients.consumer.ConsumerRecord; | ||||
| import org.apache.kafka.clients.consumer.InvalidOffsetException; | ||||
| import org.apache.kafka.clients.consumer.MockConsumer; | ||||
| import org.apache.kafka.clients.consumer.OffsetResetStrategy; | ||||
| import org.apache.kafka.clients.producer.MockProducer; | ||||
| import org.apache.kafka.clients.producer.Producer; | ||||
| import org.apache.kafka.common.Cluster; | ||||
|  | @ -56,11 +57,15 @@ import org.apache.kafka.streams.processor.Punctuator; | |||
| import org.apache.kafka.streams.processor.TaskId; | ||||
| import org.apache.kafka.streams.processor.TaskMetadata; | ||||
| import org.apache.kafka.streams.processor.ThreadMetadata; | ||||
| import org.apache.kafka.streams.processor.internals.StreamThread.StreamsMetricsThreadImpl; | ||||
| import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; | ||||
| import org.apache.kafka.streams.state.KeyValueStore; | ||||
| import org.apache.kafka.streams.state.StoreBuilder; | ||||
| import org.apache.kafka.streams.state.internals.OffsetCheckpoint; | ||||
| import org.apache.kafka.test.MockClientSupplier; | ||||
| import org.apache.kafka.test.MockProcessor; | ||||
| import org.apache.kafka.test.MockStateRestoreListener; | ||||
| import org.apache.kafka.test.MockStoreBuilder; | ||||
| import org.apache.kafka.test.MockTimestampExtractor; | ||||
| import org.apache.kafka.test.TestCondition; | ||||
| import org.apache.kafka.test.TestUtils; | ||||
|  | @ -68,6 +73,7 @@ import org.easymock.EasyMock; | |||
| import org.junit.Assert; | ||||
| import org.junit.Before; | ||||
| import org.junit.Test; | ||||
| import org.slf4j.Logger; | ||||
| 
 | ||||
| import java.io.File; | ||||
| import java.io.IOException; | ||||
|  | @ -88,12 +94,14 @@ import static org.apache.kafka.common.utils.Utils.mkMap; | |||
| import static org.apache.kafka.common.utils.Utils.mkProperties; | ||||
| import static org.apache.kafka.streams.processor.internals.AbstractStateManager.CHECKPOINT_FILE_NAME; | ||||
| import static org.hamcrest.CoreMatchers.equalTo; | ||||
| import static org.hamcrest.CoreMatchers.not; | ||||
| import static org.hamcrest.CoreMatchers.nullValue; | ||||
| import static org.hamcrest.MatcherAssert.assertThat; | ||||
| import static org.junit.Assert.assertEquals; | ||||
| import static org.junit.Assert.assertFalse; | ||||
| import static org.junit.Assert.assertNotEquals; | ||||
| import static org.junit.Assert.assertNotNull; | ||||
| import static org.junit.Assert.assertSame; | ||||
| import static org.junit.Assert.assertThat; | ||||
| import static org.junit.Assert.assertTrue; | ||||
| import static org.junit.Assert.fail; | ||||
| 
 | ||||
|  | @ -907,6 +915,61 @@ public class StreamThreadTest { | |||
|         assertEquals(0, thread.standbyRecords().size()); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void shouldCreateStandbyTask() { | ||||
|         setupInternalTopologyWithoutState(); | ||||
|         internalTopologyBuilder.addStateStore(new MockStoreBuilder("myStore", true), "processor1"); | ||||
| 
 | ||||
|         final StandbyTask standbyTask = createStandbyTask(); | ||||
| 
 | ||||
|         assertThat(standbyTask, not(nullValue())); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void shouldNotCreateStandbyTaskWithoutStateStores() { | ||||
|         setupInternalTopologyWithoutState(); | ||||
| 
 | ||||
|         final StandbyTask standbyTask = createStandbyTask(); | ||||
| 
 | ||||
|         assertThat(standbyTask, nullValue()); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled() { | ||||
|         setupInternalTopologyWithoutState(); | ||||
|         final StoreBuilder storeBuilder = new MockStoreBuilder("myStore", true); | ||||
|         storeBuilder.withLoggingDisabled(); | ||||
|         internalTopologyBuilder.addStateStore(storeBuilder, "processor1"); | ||||
| 
 | ||||
|         final StandbyTask standbyTask = createStandbyTask(); | ||||
| 
 | ||||
|         assertThat(standbyTask, nullValue()); | ||||
|     } | ||||
| 
 | ||||
|     private void setupInternalTopologyWithoutState() { | ||||
|         final MockProcessor mockProcessor = new MockProcessor(); | ||||
|         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); | ||||
|         internalTopologyBuilder.addProcessor("processor1", () -> mockProcessor, "source1"); | ||||
|     } | ||||
| 
 | ||||
|     private StandbyTask createStandbyTask() { | ||||
|         final LogContext logContext = new LogContext("test"); | ||||
|         final Logger log = logContext.logger(StreamThreadTest.class); | ||||
|         final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics, clientId); | ||||
|         final StreamThread.StandbyTaskCreator standbyTaskCreator = new StreamThread.StandbyTaskCreator( | ||||
|             internalTopologyBuilder, | ||||
|             config, | ||||
|             streamsMetrics, | ||||
|             stateDirectory, | ||||
|             new MockChangelogReader(), | ||||
|             mockTime, | ||||
|             log); | ||||
|         return standbyTaskCreator.createTask( | ||||
|             new MockConsumer<>(OffsetResetStrategy.EARLIEST), | ||||
|             new TaskId(1, 2), | ||||
|             Collections.emptySet()); | ||||
|     } | ||||
| 
 | ||||
|     @Test | ||||
|     public void shouldPunctuateActiveTask() { | ||||
|         final List<Long> punctuatedStreamTime = new ArrayList<>(); | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue