diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 40c565c8ceb..9b0a666d807 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -69,7 +69,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; -import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -561,12 +560,6 @@ public class KTableImpl extends AbstractStream implements KTable< final String storeName = suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); - final ProcessorSupplier, K, Change> suppressionSupplier = new KTableSuppressProcessorSupplier<>( - suppressedInternal, - storeName, - this - ); - final StoreBuilder>> storeBuilder; if (suppressedInternal.bufferConfig().isLoggingEnabled()) { @@ -584,10 +577,16 @@ public class KTableImpl extends AbstractStream implements KTable< .withLoggingDisabled(); } + final ProcessorSupplier, K, Change> suppressionSupplier = new KTableSuppressProcessorSupplier<>( + suppressedInternal, + storeBuilder, + this + ); + final ProcessorGraphNode> node = new TableSuppressNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - StoreBuilderWrapper.wrapStoreBuilder(storeBuilder) + new String[]{storeName} ); node.setOutputVersioned(false); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java index 88e55f37a25..595d0266aae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.streams.kstream.internals.graph; -import org.apache.kafka.streams.processor.internals.StoreFactory; - public class TableSuppressNode extends StatefulProcessorNode { public TableSuppressNode(final String nodeName, final ProcessorParameters processorParameters, - final StoreFactory materializedKTableStoreBuilder) { - super(nodeName, processorParameters, materializedKTableStoreBuilder); + final String[] storeNames) { + super(nodeName, processorParameters, storeNames); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java index 0b0c6ca15e9..637c0a757e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java @@ -33,23 +33,26 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.Maybe; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; +import java.util.Set; + import static java.util.Objects.requireNonNull; public class KTableSuppressProcessorSupplier implements KTableProcessorSupplier { private final SuppressedInternal suppress; - private final String storeName; + private final StoreBuilder storeBuilder; private final KTableImpl parentKTable; public KTableSuppressProcessorSupplier(final SuppressedInternal suppress, - final String storeName, + final StoreBuilder storeBuilder, final KTableImpl parentKTable) { this.suppress = suppress; - this.storeName = storeName; + this.storeBuilder = storeBuilder; this.parentKTable = parentKTable; // The suppress buffer requires seeing the old values, to support the prior value view. parentKTable.enableSendingOldValues(true); @@ -57,7 +60,12 @@ public class KTableSuppressProcessorSupplier implements @Override public Processor, K, Change> get() { - return new KTableSuppressProcessor<>(suppress, storeName); + return new KTableSuppressProcessor<>(suppress, storeBuilder.name()); + } + + @Override + public Set> stores() { + return Set.of(storeBuilder); } @Override @@ -75,7 +83,7 @@ public class KTableSuppressProcessorSupplier implements public void init(final ProcessorContext context) { parentGetter.init(context); // the main processor is responsible for the buffer's lifecycle - buffer = requireNonNull(context.getStateStore(storeName)); + buffer = requireNonNull(context.getStateStore(storeBuilder.name())); } @Override @@ -107,7 +115,7 @@ public class KTableSuppressProcessorSupplier implements final String[] parentStores = parentValueGetterSupplier.storeNames(); final String[] stores = new String[1 + parentStores.length]; System.arraycopy(parentStores, 0, stores, 1, parentStores.length); - stores[0] = storeName; + stores[0] = storeBuilder.name(); return stores; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 71905e1481c..08e413703c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.StateStore; @@ -1517,6 +1518,30 @@ public class StreamsBuilderTest { assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); } + @Test + public void shouldWrapProcessorsForSuppress() { + final Map props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.stream("input", Consumed.as("source")) + .groupByKey() + .count(Named.as("count"))// wrapped 1 + .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(10), Suppressed.BufferConfig.unbounded()).withName("suppressed")) // wrapped 2 + .toStream(Named.as("toStream"))// wrapped 3 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream", "suppressed")); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); + } + @Test public void shouldWrapProcessorsForTimeWindowStreamAggregate() { final Map props = dummyStreamsConfigMap(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java index bfa803f89bb..d051d6a6acf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; import org.apache.kafka.test.MockInternalNewProcessorContext; import org.apache.kafka.test.StreamsTestUtils; @@ -38,6 +39,7 @@ import org.apache.kafka.test.TestUtils; import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -144,7 +146,7 @@ public class KTableSuppressProcessorMetricsTest { final Processor, String, Change> processor = new KTableSuppressProcessorSupplier<>( (SuppressedInternal) Suppressed.untilTimeLimit(Duration.ofDays(100), maxRecords(1)), - storeName, + mockBuilderWithName(storeName), mock ).get(); @@ -206,4 +208,10 @@ public class KTableSuppressProcessorMetricsTest { assertThat(metrics.get(metricName).metricName().description(), is(metricName.description())); assertThat((T) metrics.get(metricName).metricValue(), matcher); } + + private StoreBuilder mockBuilderWithName(final String name) { + final StoreBuilder builder = Mockito.mock(StoreBuilder.class); + Mockito.when(builder.name()).thenReturn(name); + return builder; + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index 36d09f5bafe..e34e58c78c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; import org.apache.kafka.test.MockInternalNewProcessorContext; @@ -43,6 +44,7 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -92,7 +94,7 @@ public class KTableSuppressProcessorTest { @SuppressWarnings("unchecked") final KTableImpl parent = mock(KTableImpl.class); final Processor, K, Change> processor = - new KTableSuppressProcessorSupplier<>((SuppressedInternal) suppressed, storeName, parent).get(); + new KTableSuppressProcessorSupplier<>((SuppressedInternal) suppressed, mockBuilderWithName(storeName), parent).get(); final MockInternalNewProcessorContext> context = new MockInternalNewProcessorContext<>(); context.setCurrentNode(new ProcessorNode("testNode")); @@ -487,4 +489,10 @@ public class KTableSuppressProcessorTest { new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize) ); } + + private static StoreBuilder mockBuilderWithName(final String name) { + final StoreBuilder builder = Mockito.mock(StoreBuilder.class); + Mockito.when(builder.name()).thenReturn(name); + return builder; + } } \ No newline at end of file