mirror of https://github.com/apache/kafka.git
Compare commits
9 Commits
0557bb7a3e
...
bf7ad70f2b
Author | SHA1 | Date |
---|---|---|
|
bf7ad70f2b | |
|
aae4160f79 | |
|
7a3c276c5d | |
|
4ca6f92f49 | |
|
4140bc4d16 | |
|
5d45822e88 | |
|
1bd0f398d8 | |
|
5fa9cd0b46 | |
|
cc130d251a |
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.streams.processor.internals;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
|
||||
|
||||
public class InitProcessorRecordContext extends ProcessorRecordContext {
|
||||
|
||||
private final long initTime;
|
||||
private static final long NO_OFFSET = -1;
|
||||
private static final int NO_PARTITION = -1;
|
||||
|
||||
public InitProcessorRecordContext(final long currentTimestamp) {
|
||||
super(ConsumerRecord.NO_TIMESTAMP, NO_OFFSET, NO_PARTITION, null, new RecordHeaders());
|
||||
this.initTime = currentTimestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long timestamp() {
|
||||
return initTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public boolean equals(final Object o) {
|
||||
return super.equals(o);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public int hashCode() {
|
||||
return super.hashCode();
|
||||
}
|
||||
|
||||
}
|
|
@ -1104,11 +1104,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
|
|||
// initialize the task by initializing all its processor nodes in the topology
|
||||
log.trace("Initializing processor nodes of the topology");
|
||||
for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
|
||||
processorContext.setCurrentNode(node);
|
||||
final InitProcessorRecordContext initContext = new InitProcessorRecordContext(time.milliseconds());
|
||||
updateProcessorContext(node, time.milliseconds(), initContext);
|
||||
try {
|
||||
node.init(processorContext, processingExceptionHandler);
|
||||
} finally {
|
||||
processorContext.setCurrentNode(null);
|
||||
updateProcessorContext(null, ConsumerRecord.NO_TIMESTAMP, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -867,6 +867,23 @@ public class ProcessorTopologyTest {
|
|||
equalTo(new TestRecord<>("key3", "value3", null, 3000L)));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTopologyInitializationWithInitialKeyAndValue() {
|
||||
final String initialKey = "key1";
|
||||
final String initialValue = "value1";
|
||||
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
|
||||
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String());
|
||||
topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1);
|
||||
topology.addProcessor("processor1", defineWithStores(() -> new StatefulProcessorWithInitialization(DEFAULT_STORE_NAME, initialKey, initialValue), Collections.singleton(storeBuilder)), "source1");
|
||||
driver = new TopologyTestDriver(topology, props);
|
||||
final KeyValueStore<String, String> store = driver.getKeyValueStore(DEFAULT_STORE_NAME);
|
||||
final List<KeyValue<String, String>> results = prefixScanResults(store, DEFAULT_PREFIX);
|
||||
assertEquals(1, results.size());
|
||||
assertEquals(initialValue, results.get(0).value);
|
||||
assertEquals(initialKey, results.get(0).key);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldCreateStringWithSourceAndTopics() {
|
||||
topology.addSource("source", "topic1", "topic2");
|
||||
|
@ -1264,6 +1281,31 @@ public class ProcessorTopologyTest {
|
|||
}
|
||||
}
|
||||
|
||||
private static class StatefulProcessorWithInitialization implements Processor<String, String, Void, Void> {
|
||||
private KeyValueStore<String, String> store;
|
||||
private final String storeName;
|
||||
private final String initialKey;
|
||||
private final String initialValue;
|
||||
|
||||
public StatefulProcessorWithInitialization(final String storeName, final String initialKey, final String initialValue) {
|
||||
this.storeName = storeName;
|
||||
this.initialKey = initialKey;
|
||||
this.initialValue = initialValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(final ProcessorContext<Void, Void> context) {
|
||||
store = context.getStateStore(storeName);
|
||||
store.put(initialKey, initialValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(final Record<String, String> record) {
|
||||
store.put(record.key(), record.value());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A processor that stores each key-value pair in an in-memory key-value store registered with the context.
|
||||
*/
|
||||
|
|
|
@ -493,7 +493,6 @@ public class TopologyTestDriver implements Closeable {
|
|||
streamsMetrics,
|
||||
cache
|
||||
);
|
||||
context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
|
||||
|
||||
task = new StreamTask(
|
||||
TASK_ID,
|
||||
|
|
Loading…
Reference in New Issue