Compare commits

...

9 Commits

Author SHA1 Message Date
Eduwer Camacaro bf7ad70f2b
Merge aae4160f79 into c6bbbbe24d 2025-10-07 07:03:09 -05:00
Eduwer Camacaro aae4160f79 Change to static ts for init processor context 2025-10-06 09:27:43 -05:00
Eduwer Camacaro 7a3c276c5d Disable logging for unit test 2025-10-06 09:16:55 -05:00
Eduwer Camacaro 4ca6f92f49 Enable logging 2025-10-03 11:53:26 -05:00
Eduwer Camacaro 4140bc4d16 Refactor TopologyTestDriver setup 2025-10-03 10:57:56 -05:00
Eduwer Camacaro 5d45822e88 Add test 2025-10-03 09:44:55 -05:00
Eduwer Camacaro 1bd0f398d8 Apply suggestions 2025-09-24 10:02:01 -05:00
Eduwer Camacaro 5fa9cd0b46 Subclass for init record contexts 2025-09-19 11:39:09 -05:00
Eduwer Camacaro cc130d251a Set dummy ProcessorRecordContext for processor init 2025-08-25 08:09:20 -05:00
4 changed files with 96 additions and 3 deletions

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
public class InitProcessorRecordContext extends ProcessorRecordContext {
private final long initTime;
private static final long NO_OFFSET = -1;
private static final int NO_PARTITION = -1;
public InitProcessorRecordContext(final long currentTimestamp) {
super(ConsumerRecord.NO_TIMESTAMP, NO_OFFSET, NO_PARTITION, null, new RecordHeaders());
this.initTime = currentTimestamp;
}
@Override
public long timestamp() {
return initTime;
}
@Override
@Deprecated
public boolean equals(final Object o) {
return super.equals(o);
}
@Override
@Deprecated
public int hashCode() {
return super.hashCode();
}
}

View File

@ -1104,11 +1104,12 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
// initialize the task by initializing all its processor nodes in the topology
log.trace("Initializing processor nodes of the topology");
for (final ProcessorNode<?, ?, ?, ?> node : topology.processors()) {
processorContext.setCurrentNode(node);
final InitProcessorRecordContext initContext = new InitProcessorRecordContext(time.milliseconds());
updateProcessorContext(node, time.milliseconds(), initContext);
try {
node.init(processorContext, processingExceptionHandler);
} finally {
processorContext.setCurrentNode(null);
updateProcessorContext(null, ConsumerRecord.NO_TIMESTAMP, null);
}
}
}

View File

@ -867,6 +867,23 @@ public class ProcessorTopologyTest {
equalTo(new TestRecord<>("key3", "value3", null, 3000L)));
}
@Test
public void testTopologyInitializationWithInitialKeyAndValue() {
final String initialKey = "key1";
final String initialValue = "value1";
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String());
topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1);
topology.addProcessor("processor1", defineWithStores(() -> new StatefulProcessorWithInitialization(DEFAULT_STORE_NAME, initialKey, initialValue), Collections.singleton(storeBuilder)), "source1");
driver = new TopologyTestDriver(topology, props);
final KeyValueStore<String, String> store = driver.getKeyValueStore(DEFAULT_STORE_NAME);
final List<KeyValue<String, String>> results = prefixScanResults(store, DEFAULT_PREFIX);
assertEquals(1, results.size());
assertEquals(initialValue, results.get(0).value);
assertEquals(initialKey, results.get(0).key);
}
@Test
public void shouldCreateStringWithSourceAndTopics() {
topology.addSource("source", "topic1", "topic2");
@ -1264,6 +1281,31 @@ public class ProcessorTopologyTest {
}
}
private static class StatefulProcessorWithInitialization implements Processor<String, String, Void, Void> {
private KeyValueStore<String, String> store;
private final String storeName;
private final String initialKey;
private final String initialValue;
public StatefulProcessorWithInitialization(final String storeName, final String initialKey, final String initialValue) {
this.storeName = storeName;
this.initialKey = initialKey;
this.initialValue = initialValue;
}
@Override
public void init(final ProcessorContext<Void, Void> context) {
store = context.getStateStore(storeName);
store.put(initialKey, initialValue);
}
@Override
public void process(final Record<String, String> record) {
store.put(record.key(), record.value());
}
}
/**
* A processor that stores each key-value pair in an in-memory key-value store registered with the context.
*/

View File

@ -493,7 +493,6 @@ public class TopologyTestDriver implements Closeable {
streamsMetrics,
cache
);
context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
task = new StreamTask(
TASK_ID,