KAFKA-18026: KIP-1112 migrate KTableSuppressProcessorSupplier (#18150)

Migrates KTableSuppressProcessorSupplier to use the the ProcessorSupplier#stores() method

Reviewers: Guozhang Wang <guozhang.wang.us@gmail.com>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Almog Gavra 2024-12-17 23:45:49 -08:00 committed by GitHub
parent 501da383fa
commit 4bcbf9fae7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 66 additions and 20 deletions

View File

@ -69,7 +69,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
@ -561,12 +560,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String storeName = final String storeName =
suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME);
final ProcessorSupplier<K, Change<V>, K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
suppressedInternal,
storeName,
this
);
final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> storeBuilder; final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> storeBuilder;
if (suppressedInternal.bufferConfig().isLoggingEnabled()) { if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
@ -584,10 +577,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
.withLoggingDisabled(); .withLoggingDisabled();
} }
final ProcessorSupplier<K, Change<V>, K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>(
suppressedInternal,
storeBuilder,
this
);
final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>( final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
name, name,
new ProcessorParameters<>(suppressionSupplier, name), new ProcessorParameters<>(suppressionSupplier, name),
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder) new String[]{storeName}
); );
node.setOutputVersioned(false); node.setOutputVersioned(false);

View File

@ -16,12 +16,10 @@
*/ */
package org.apache.kafka.streams.kstream.internals.graph; package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.processor.internals.StoreFactory;
public class TableSuppressNode<K, V> extends StatefulProcessorNode<K, V> { public class TableSuppressNode<K, V> extends StatefulProcessorNode<K, V> {
public TableSuppressNode(final String nodeName, public TableSuppressNode(final String nodeName,
final ProcessorParameters<K, V, ?, ?> processorParameters, final ProcessorParameters<K, V, ?, ?> processorParameters,
final StoreFactory materializedKTableStoreBuilder) { final String[] storeNames) {
super(nodeName, processorParameters, materializedKTableStoreBuilder); super(nodeName, processorParameters, storeNames);
} }
} }

View File

@ -33,23 +33,26 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Maybe; import org.apache.kafka.streams.state.internals.Maybe;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import java.util.Set;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
public class KTableSuppressProcessorSupplier<K, V> implements public class KTableSuppressProcessorSupplier<K, V> implements
KTableProcessorSupplier<K, V, K, V> { KTableProcessorSupplier<K, V, K, V> {
private final SuppressedInternal<K> suppress; private final SuppressedInternal<K> suppress;
private final String storeName; private final StoreBuilder<?> storeBuilder;
private final KTableImpl<K, ?, V> parentKTable; private final KTableImpl<K, ?, V> parentKTable;
public KTableSuppressProcessorSupplier(final SuppressedInternal<K> suppress, public KTableSuppressProcessorSupplier(final SuppressedInternal<K> suppress,
final String storeName, final StoreBuilder<?> storeBuilder,
final KTableImpl<K, ?, V> parentKTable) { final KTableImpl<K, ?, V> parentKTable) {
this.suppress = suppress; this.suppress = suppress;
this.storeName = storeName; this.storeBuilder = storeBuilder;
this.parentKTable = parentKTable; this.parentKTable = parentKTable;
// The suppress buffer requires seeing the old values, to support the prior value view. // The suppress buffer requires seeing the old values, to support the prior value view.
parentKTable.enableSendingOldValues(true); parentKTable.enableSendingOldValues(true);
@ -57,7 +60,12 @@ public class KTableSuppressProcessorSupplier<K, V> implements
@Override @Override
public Processor<K, Change<V>, K, Change<V>> get() { public Processor<K, Change<V>, K, Change<V>> get() {
return new KTableSuppressProcessor<>(suppress, storeName); return new KTableSuppressProcessor<>(suppress, storeBuilder.name());
}
@Override
public Set<StoreBuilder<?>> stores() {
return Set.of(storeBuilder);
} }
@Override @Override
@ -75,7 +83,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements
public void init(final ProcessorContext<?, ?> context) { public void init(final ProcessorContext<?, ?> context) {
parentGetter.init(context); parentGetter.init(context);
// the main processor is responsible for the buffer's lifecycle // the main processor is responsible for the buffer's lifecycle
buffer = requireNonNull(context.getStateStore(storeName)); buffer = requireNonNull(context.getStateStore(storeBuilder.name()));
} }
@Override @Override
@ -107,7 +115,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements
final String[] parentStores = parentValueGetterSupplier.storeNames(); final String[] parentStores = parentValueGetterSupplier.storeNames();
final String[] stores = new String[1 + parentStores.length]; final String[] stores = new String[1 + parentStores.length];
System.arraycopy(parentStores, 0, stores, 1, parentStores.length); System.arraycopy(parentStores, 0, stores, 1, parentStores.length);
stores[0] = storeName; stores[0] = storeBuilder.name();
return stores; return stores;
} }
}; };

View File

@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
@ -1517,6 +1518,30 @@ public class StreamsBuilderTest {
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
} }
@Test
public void shouldWrapProcessorsForSuppress() {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class);
final WrapperRecorder counter = new WrapperRecorder();
props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
builder.stream("input", Consumed.as("source"))
.groupByKey()
.count(Named.as("count"))// wrapped 1
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(10), Suppressed.BufferConfig.unbounded()).withName("suppressed")) // wrapped 2
.toStream(Named.as("toStream"))// wrapped 3
.to("output", Produced.as("sink"));
builder.build();
assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3));
assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream", "suppressed"));
assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
}
@Test @Test
public void shouldWrapProcessorsForTimeWindowStreamAggregate() { public void shouldWrapProcessorsForTimeWindowStreamAggregate() {
final Map<Object, Object> props = dummyStreamsConfigMap(); final Map<Object, Object> props = dummyStreamsConfigMap();

View File

@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
import org.apache.kafka.test.MockInternalNewProcessorContext; import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.StreamsTestUtils;
@ -38,6 +39,7 @@ import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness; import org.mockito.quality.Strictness;
@ -144,7 +146,7 @@ public class KTableSuppressProcessorMetricsTest {
final Processor<String, Change<Long>, String, Change<Long>> processor = final Processor<String, Change<Long>, String, Change<Long>> processor =
new KTableSuppressProcessorSupplier<>( new KTableSuppressProcessorSupplier<>(
(SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)), (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
storeName, mockBuilderWithName(storeName),
mock mock
).get(); ).get();
@ -206,4 +208,10 @@ public class KTableSuppressProcessorMetricsTest {
assertThat(metrics.get(metricName).metricName().description(), is(metricName.description())); assertThat(metrics.get(metricName).metricName().description(), is(metricName.description()));
assertThat((T) metrics.get(metricName).metricValue(), matcher); assertThat((T) metrics.get(metricName).metricValue(), matcher);
} }
private StoreBuilder<?> mockBuilderWithName(final String name) {
final StoreBuilder<?> builder = Mockito.mock(StoreBuilder.class);
Mockito.when(builder.name()).thenReturn(name);
return builder;
}
} }

View File

@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
import org.apache.kafka.test.MockInternalNewProcessorContext; import org.apache.kafka.test.MockInternalNewProcessorContext;
@ -43,6 +44,7 @@ import org.hamcrest.Description;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness; import org.mockito.quality.Strictness;
@ -92,7 +94,7 @@ public class KTableSuppressProcessorTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final KTableImpl<K, ?, V> parent = mock(KTableImpl.class); final KTableImpl<K, ?, V> parent = mock(KTableImpl.class);
final Processor<K, Change<V>, K, Change<V>> processor = final Processor<K, Change<V>, K, Change<V>> processor =
new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, storeName, parent).get(); new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, mockBuilderWithName(storeName), parent).get();
final MockInternalNewProcessorContext<K, Change<V>> context = new MockInternalNewProcessorContext<>(); final MockInternalNewProcessorContext<K, Change<V>> context = new MockInternalNewProcessorContext<>();
context.setCurrentNode(new ProcessorNode("testNode")); context.setCurrentNode(new ProcessorNode("testNode"));
@ -487,4 +489,10 @@ public class KTableSuppressProcessorTest {
new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize) new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize)
); );
} }
private static StoreBuilder<?> mockBuilderWithName(final String name) {
final StoreBuilder<?> builder = Mockito.mock(StoreBuilder.class);
Mockito.when(builder.name()).thenReturn(name);
return builder;
}
} }