diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index bdd6dc3b37a..293bc6b7a86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -389,6 +389,16 @@ public interface KTable { */ KStream toStream(final KeyValueMapper mapper); + /** + * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration. + * + * This controls what updates downstream table and stream operations will receive. + * + * @param suppressed Configuration object determining what, if any, updates to suppress + * @return A new KTable with the desired suppression characteristics. + */ + KTable suppress(final Suppressed suppressed); + /** * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value * (with possibly a new type), with default serializers, deserializers, and state store. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java new file mode 100644 index 00000000000..7488ef6ff37 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl; +import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; +import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; + +import java.time.Duration; + +public interface Suppressed { + + /** + * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly + * enforce the time bound and never emit early. + */ + interface StrictBufferConfig extends BufferConfig { + + } + + interface BufferConfig> { + /** + * Create a size-constrained buffer in terms of the maximum number of keys it will store. + */ + static BufferConfig maxRecords(final long recordLimit) { + return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE); + } + + /** + * Set a size constraint on the buffer in terms of the maximum number of keys it will store. + */ + BC withMaxRecords(final long recordLimit); + + /** + * Create a size-constrained buffer in terms of the maximum number of bytes it will use. + */ + static BufferConfig maxBytes(final long byteLimit) { + return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit); + } + + /** + * Set a size constraint on the buffer, the maximum number of bytes it will use. + */ + BC withMaxBytes(final long byteLimit); + + /** + * Create a buffer unconstrained by size (either keys or bytes). + * + * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + * + * If there isn't enough heap available to meet the demand, the application will encounter an + * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that + * JVM processes under extreme memory pressure may exhibit poor GC behavior. + * + * This is a convenient option if you doubt that your buffer will be that large, but also don't + * wish to pick particular constraints, such as in testing. + * + * This buffer is "strict" in the sense that it will enforce the time bound or crash. + * It will never emit early. + */ + static StrictBufferConfig unbounded() { + return new StrictBufferConfigImpl(); + } + + /** + * Set the buffer to be unconstrained by size (either keys or bytes). + * + * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + * + * If there isn't enough heap available to meet the demand, the application will encounter an + * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that + * JVM processes under extreme memory pressure may exhibit poor GC behavior. + * + * This is a convenient option if you doubt that your buffer will be that large, but also don't + * wish to pick particular constraints, such as in testing. + * + * This buffer is "strict" in the sense that it will enforce the time bound or crash. + * It will never emit early. + */ + StrictBufferConfig withNoBound(); + + /** + * Set the buffer to gracefully shut down the application when any of its constraints are violated + * + * This buffer is "strict" in the sense that it will enforce the time bound or shut down. + * It will never emit early. + */ + StrictBufferConfig shutDownWhenFull(); + + /** + * Sets the buffer to use on-disk storage if it requires more memory than the constraints allow. + * + * This buffer is "strict" in the sense that it will never emit early. + */ + StrictBufferConfig spillToDiskWhenFull(); + + /** + * Set the buffer to just emit the oldest records when any of its constraints are violated. + * + * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing + * duplicate results downstream, but does not promise to eliminate them. + */ + BufferConfig emitEarlyWhenFull(); + } + + /** + * Configure the suppression to emit only the "final results" from the window. + * + * By default all Streams operators emit results whenever new results are available. + * This includes windowed operations. + * + * This configuration will instead emit just one result per key for each window, guaranteeing + * to deliver only the final result. This option is suitable for use cases in which the business logic + * requires a hard guarantee that only the final result is propagated. For example, sending alerts. + * + * To accomplish this, the operator will buffer events from the window until the window close (that is, + * until the end-time passes, and additionally until the grace period expires). Since windowed operators + * are required to reject late events for a window whose grace period is expired, there is an additional + * guarantee that the final results emitted from this suppression will match any queriable state upstream. + * + * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. + * This is required to be a "strict" config, since it would violate the "final results" + * property to emit early and then issue an update later. + * @param The key type for the KTable to apply this suppression to. "Final results" mode is only available + * on Windowed KTables (this is enforced by the type parameter). + * @return a "final results" mode suppression configuration + */ + static Suppressed untilWindowCloses(final StrictBufferConfig bufferConfig) { + return new FinalResultsSuppressionBuilder<>(bufferConfig); + } + + /** + * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record + * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces + * the first record in the buffer but does not re-start the timer. + * + * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events. + * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. + * @param The key type for the KTable to apply this suppression to. + * @return a suppression configuration + */ + static Suppressed untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) { + return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 5a3c897f781..b89399bf7b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -class KStreamSessionWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +public class KStreamSessionWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class); private final String storeName; @@ -49,11 +49,11 @@ class KStreamSessionWindowAggregate implements KStreamAggProcessorSup private boolean sendOldValues = false; - KStreamSessionWindowAggregate(final SessionWindows windows, - final String storeName, - final Initializer initializer, - final Aggregator aggregator, - final Merger sessionMerger) { + public KStreamSessionWindowAggregate(final SessionWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator, + final Merger sessionMerger) { this.windows = windows; this.storeName = storeName; this.initializer = initializer; @@ -66,6 +66,10 @@ class KStreamSessionWindowAggregate implements KStreamAggProcessorSup return new KStreamSessionWindowAggregateProcessor(); } + public SessionWindows windows() { + return windows; + } + @Override public void enableSendingOldValues() { sendOldValues = true; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 57542847b96..f29251573e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -44,10 +44,10 @@ public class KStreamWindowAggregate implements KStr private boolean sendOldValues = false; - KStreamWindowAggregate(final Windows windows, - final String storeName, - final Initializer initializer, - final Aggregator aggregator) { + public KStreamWindowAggregate(final Windows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { this.windows = windows; this.storeName = storeName; this.initializer = initializer; @@ -59,6 +59,10 @@ public class KStreamWindowAggregate implements KStr return new KStreamWindowAggregateProcessor(); } + public Windows windows() { + return windows; + } + @Override public void enableSendingOldValues() { sendOldValues = true; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 352e42d3918..2330fad1b16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -26,21 +26,30 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode; +import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; +import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import java.time.Duration; +import java.util.Collections; import java.util.Objects; import java.util.Set; +import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace; + /** * The implementation class of {@link KTable}. * @@ -66,6 +75,8 @@ public class KTableImpl extends AbstractStream implements KTable extends AbstractStream implements KTable suppress(final Suppressed suppressed) { + final String name = builder.newProcessorName(SUPPRESS_NAME); + + final ProcessorSupplier> suppressionSupplier = + () -> new KTableSuppressProcessor<>(buildSuppress(suppressed)); + + final ProcessorParameters> processorParameters = new ProcessorParameters<>( + suppressionSupplier, + name + ); + + final ProcessorGraphNode> node = new ProcessorGraphNode<>(name, processorParameters, false); + + builder.addGraphNode(streamsGraphNode, node); + + return new KTableImpl( + builder, + name, + suppressionSupplier, + keySerde, + valSerde, + Collections.singleton(this.name), + null, + false, + node + ); + } + + @SuppressWarnings("unchecked") + private SuppressedImpl buildSuppress(final Suppressed suppress) { + if (suppress instanceof FinalResultsSuppressionBuilder) { + final long grace = findAndVerifyWindowGrace(streamsGraphNode); + + final FinalResultsSuppressionBuilder builder = (FinalResultsSuppressionBuilder) suppress; + + final SuppressedImpl finalResultsSuppression = + builder.buildFinalResultsSuppression(Duration.ofMillis(grace)); + + return (SuppressedImpl) finalResultsSuppression; + } else if (suppress instanceof SuppressedImpl) { + return (SuppressedImpl) suppress; + } else { + throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed."); + } + } + @Override public KTable join(final KTable other, final ValueJoiner joiner) { @@ -492,12 +550,12 @@ public class KTableImpl extends AbstractStream implements KTable extends AbstractStream implements KTable> selectSupplier = new KTableRepartitionMap<>(this, selector); - final ProcessorParameters processorParameters = new ProcessorParameters<>(selectSupplier, selectName); + final ProcessorParameters> processorParameters = new ProcessorParameters<>(selectSupplier, selectName); // select the aggregate key and values (old and new), it would require parent to send old values - final ProcessorGraphNode groupByMapNode = new ProcessorGraphNode<>( + final ProcessorGraphNode> groupByMapNode = new ProcessorGraphNode<>( selectName, processorParameters, false diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java new file mode 100644 index 00000000000..306ddf5cf5e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate; +import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public final class GraphGraceSearchUtil { + private GraphGraceSearchUtil() {} + + public static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraphNode) { + return findAndVerifyWindowGrace(streamsGraphNode, ""); + } + + private static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraphNode, final String chain) { + // error base case: we traversed off the end of the graph without finding a window definition + if (streamsGraphNode == null) { + throw new TopologyException( + "Window close time is only defined for windowed computations. Got [" + chain + "]." + ); + } + // base case: return if this node defines a grace period. + { + final Long gracePeriod = extractGracePeriod(streamsGraphNode); + if (gracePeriod != null) { + return gracePeriod; + } + } + + final String newChain = chain.equals("") ? streamsGraphNode.nodeName() : streamsGraphNode.nodeName() + "->" + chain; + + if (streamsGraphNode.parentNodes().isEmpty()) { + // error base case: we traversed to the end of the graph without finding a window definition + throw new TopologyException( + "Window close time is only defined for windowed computations. Got [" + newChain + "]." + ); + } + + // recursive case: all parents must define a grace period, and we use the max of our parents' graces. + long inheritedGrace = -1; + for (final StreamsGraphNode parentNode : streamsGraphNode.parentNodes()) { + final long parentGrace = findAndVerifyWindowGrace(parentNode, newChain); + inheritedGrace = Math.max(inheritedGrace, parentGrace); + } + + if (inheritedGrace == -1) { + throw new IllegalStateException(); // shouldn't happen, and it's not a legal grace period + } + + return inheritedGrace; + } + + private static Long extractGracePeriod(final StreamsGraphNode node) { + if (node instanceof StatefulProcessorNode) { + final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier(); + if (processorSupplier instanceof KStreamWindowAggregate) { + final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier; + final Windows windows = kStreamWindowAggregate.windows(); + return windows.gracePeriodMs(); + } else if (processorSupplier instanceof KStreamSessionWindowAggregate) { + final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier; + final SessionWindows windows = kStreamSessionWindowAggregate.windows(); + return windows.gracePeriodMs(); + } else { + return null; + } + } else { + return null; + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java new file mode 100644 index 00000000000..e731dc6f5e1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; + +import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; + +abstract class BufferConfigImpl> implements Suppressed.BufferConfig { + public abstract long maxKeys(); + + public abstract long maxBytes(); + + @SuppressWarnings("unused") + public abstract BufferFullStrategy bufferFullStrategy(); + + @Override + public Suppressed.StrictBufferConfig withNoBound() { + return new StrictBufferConfigImpl( + Long.MAX_VALUE, + Long.MAX_VALUE, + SHUT_DOWN // doesn't matter, given the bounds + ); + } + + @Override + public Suppressed.StrictBufferConfig shutDownWhenFull() { + return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN); + } + + @Override + public Suppressed.BufferConfig emitEarlyWhenFull() { + return new EagerBufferConfigImpl(maxKeys(), maxBytes()); + } + + @Override + public Suppressed.StrictBufferConfig spillToDiskWhenFull() { + throw new UnsupportedOperationException("not implemented"); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java new file mode 100644 index 00000000000..2da7c141825 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +public enum BufferFullStrategy { + EMIT, + SPILL_TO_DISK, + SHUT_DOWN +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java new file mode 100644 index 00000000000..0c2c883e18a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; + +import java.util.Objects; + +public class EagerBufferConfigImpl extends BufferConfigImpl { + + private final long maxKeys; + private final long maxBytes; + + public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) { + this.maxKeys = maxKeys; + this.maxBytes = maxBytes; + } + + @Override + public Suppressed.BufferConfig withMaxRecords(final long recordLimit) { + return new EagerBufferConfigImpl(recordLimit, maxBytes); + } + + @Override + public Suppressed.BufferConfig withMaxBytes(final long byteLimit) { + return new EagerBufferConfigImpl(maxKeys, byteLimit); + } + + @Override + public long maxKeys() { + return maxKeys; + } + + @Override + public long maxBytes() { + return maxBytes; + } + + @Override + public BufferFullStrategy bufferFullStrategy() { + return BufferFullStrategy.EMIT; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o; + return maxKeys == that.maxKeys && + maxBytes == that.maxBytes; + } + + @Override + public int hashCode() { + return Objects.hash(maxKeys, maxBytes); + } + + @Override + public String toString() { + return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + maxBytes + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java new file mode 100644 index 00000000000..548f5991dbb --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; + +import java.time.Duration; +import java.util.Objects; + +public class FinalResultsSuppressionBuilder implements Suppressed { + private final StrictBufferConfig bufferConfig; + + public FinalResultsSuppressionBuilder(final Suppressed.StrictBufferConfig bufferConfig) { + this.bufferConfig = bufferConfig; + } + + public SuppressedImpl buildFinalResultsSuppression(final Duration gracePeriod) { + return new SuppressedImpl<>( + gracePeriod, + bufferConfig, + (ProcessorContext context, K key) -> key.window().end() + ); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final FinalResultsSuppressionBuilder that = (FinalResultsSuppressionBuilder) o; + return Objects.equals(bufferConfig, that.bufferConfig); + } + + @Override + public int hashCode() { + return Objects.hash(bufferConfig); + } + + @Override + public String toString() { + return "FinalResultsSuppressionBuilder{bufferConfig=" + bufferConfig + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java new file mode 100644 index 00000000000..f65f2b4af20 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; + +import java.time.Duration; + +public class KTableSuppressProcessor implements Processor> { + private final SuppressedImpl suppress; + private InternalProcessorContext internalProcessorContext; + + public KTableSuppressProcessor(final SuppressedImpl suppress) { + this.suppress = suppress; + } + + @Override + public void init(final ProcessorContext context) { + internalProcessorContext = (InternalProcessorContext) context; + } + + @Override + public void process(final K key, final Change value) { + if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && definedRecordTime(key) <= internalProcessorContext.streamTime()) { + internalProcessorContext.forward(key, value); + } else { + throw new NotImplementedException(); + } + } + + private long definedRecordTime(final K key) { + return suppress.getTimeDefinition().time(internalProcessorContext, key); + } + + @Override + public void close() { + } + + @Override + public String toString() { + return "KTableSuppressProcessor{suppress=" + suppress + '}'; + } + + static class NotImplementedException extends RuntimeException { + NotImplementedException() { + super(); + } + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java new file mode 100644 index 00000000000..0634a748a5b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; + +import java.util.Objects; + +import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; + +public class StrictBufferConfigImpl extends BufferConfigImpl implements Suppressed.StrictBufferConfig { + + private final long maxKeys; + private final long maxBytes; + private final BufferFullStrategy bufferFullStrategy; + + public StrictBufferConfigImpl(final long maxKeys, + final long maxBytes, + final BufferFullStrategy bufferFullStrategy) { + this.maxKeys = maxKeys; + this.maxBytes = maxBytes; + this.bufferFullStrategy = bufferFullStrategy; + } + + public StrictBufferConfigImpl() { + this.maxKeys = Long.MAX_VALUE; + this.maxBytes = Long.MAX_VALUE; + this.bufferFullStrategy = SHUT_DOWN; + } + + @Override + public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) { + return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy); + } + + @Override + public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) { + return new StrictBufferConfigImpl(maxKeys, byteLimit, bufferFullStrategy); + } + + @Override + public long maxKeys() { + return maxKeys; + } + + @Override + public long maxBytes() { + return maxBytes; + } + + @Override + public BufferFullStrategy bufferFullStrategy() { + return bufferFullStrategy; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o; + return maxKeys == that.maxKeys && + maxBytes == that.maxBytes && + bufferFullStrategy == that.bufferFullStrategy; + } + + @Override + public int hashCode() { + return Objects.hash(maxKeys, maxBytes, bufferFullStrategy); + } + + @Override + public String toString() { + return "StrictBufferConfigImpl{maxKeys=" + maxKeys + + ", maxBytes=" + maxBytes + + ", bufferFullStrategy=" + bufferFullStrategy + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java new file mode 100644 index 00000000000..cffc42b66d5 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.processor.ProcessorContext; + +import java.time.Duration; +import java.util.Objects; + +public class SuppressedImpl implements Suppressed { + private static final Duration DEFAULT_SUPPRESSION_TIME = Duration.ofMillis(Long.MAX_VALUE); + private static final StrictBufferConfig DEFAULT_BUFFER_CONFIG = BufferConfig.unbounded(); + + private final BufferConfig bufferConfig; + private final Duration timeToWaitForMoreEvents; + private final TimeDefinition timeDefinition; + + public SuppressedImpl(final Duration suppressionTime, + final BufferConfig bufferConfig, + final TimeDefinition timeDefinition) { + this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime; + this.timeDefinition = timeDefinition == null ? (context, anyKey) -> context.timestamp() : timeDefinition; + this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : bufferConfig; + } + + interface TimeDefinition { + long time(final ProcessorContext context, final K key); + } + + TimeDefinition getTimeDefinition() { + return timeDefinition; + } + + Duration getTimeToWaitForMoreEvents() { + return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final SuppressedImpl that = (SuppressedImpl) o; + return Objects.equals(bufferConfig, that.bufferConfig) && + Objects.equals(getTimeToWaitForMoreEvents(), that.getTimeToWaitForMoreEvents()) && + Objects.equals(getTimeDefinition(), that.getTimeDefinition()); + } + + @Override + public int hashCode() { + return Objects.hash(bufferConfig, getTimeToWaitForMoreEvents(), getTimeDefinition()); + } + + @Override + public String toString() { + return "SuppressedImpl{" + + ", bufferConfig=" + bufferConfig + + ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents + + ", timeDefinition=" + timeDefinition + + '}'; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java new file mode 100644 index 00000000000..421311257b4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +public class KeyValueTimestamp { + private final K key; + private final V value; + private final long timestamp; + + public KeyValueTimestamp(final K key, final V value, final long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + public K key() { + return key; + } + + public V value() { + return value; + } + + public long timestamp() { + return timestamp; + } + + @Override + public String toString() { + return "KeyValueTimestamp{key=" + key + ", value=" + value + ", timestamp=" + timestamp + '}'; + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java new file mode 100644 index 00000000000..a0e78580d45 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.time.Duration; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; + +@Category({IntegrationTest.class}) +public class SuppressionIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); + private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); + private static final Serde STRING_SERDE = Serdes.String(); + private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); + private static final int COMMIT_INTERVAL = 100; + private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2; + + @Test + public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws InterruptedException { + final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(input, outputSuppressed, outputRaw); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable valueCounts = builder + .table( + input, + Consumed.with(STRING_SERDE, STRING_SERDE), + Materialized.>with(STRING_SERDE, STRING_SERDE) + .withCachingDisabled() + .withLoggingDisabled() + ) + .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) + .count(Materialized.>as("counts").withCachingDisabled()); + + valueCounts + .suppress(untilTimeLimit(Duration.ZERO, unbounded())) + .toStream() + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final KafkaStreams driver = getCleanStartedStreams(appId, builder); + + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(4L)) + ) + ); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("x", 1L, scaledTime(4L)) + ) + ); + verifyOutput( + outputSuppressed, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("x", 1L, scaledTime(4L)) + ) + ); + } finally { + driver.close(); + cleanStateAfterTest(driver); + } + } + + private void cleanStateBeforeTest(final String... topic) throws InterruptedException { + CLUSTER.deleteAllTopicsAndWait(30_000L); + for (final String s : topic) { + CLUSTER.createTopic(s, 1, 1); + } + } + + private KafkaStreams getCleanStartedStreams(final String appId, final StreamsBuilder builder) { + final Properties streamsConfig = mkProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(StreamsConfig.POLL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)) + )); + final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig); + driver.cleanUp(); + driver.start(); + return driver; + } + + private void cleanStateAfterTest(final KafkaStreams driver) throws InterruptedException { + driver.cleanUp(); + CLUSTER.deleteAllTopicsAndWait(30_000L); + } + + private long scaledTime(final long unscaledTime) { + return SCALE_FACTOR * unscaledTime; + } + + private void produceSynchronously(final String topic, final List> toProduce) { + final Properties producerConfig = mkProperties(mkMap( + mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()), + mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) + )); + try (final Producer producer = new KafkaProducer<>(producerConfig)) { + // TODO: test EOS + //noinspection ConstantConditions + if (false) { + producer.initTransactions(); + producer.beginTransaction(); + } + final LinkedList> futures = new LinkedList<>(); + for (final KeyValueTimestamp record : toProduce) { + final Future f = producer.send( + new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null) + ); + futures.add(f); + } + for (final Future future : futures) { + try { + future.get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + // TODO: test EOS + //noinspection ConstantConditions + if (false) { + producer.commitTransaction(); + } else { + producer.flush(); + } + } + } + + private void verifyOutput(final String topic, final List> expected) { + final List> results; + try { + final Properties properties = mkProperties( + mkMap( + mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"), + mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer) STRING_DESERIALIZER).getClass().getName()), + mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer) LONG_DESERIALIZER).getClass().getName()) + ) + ); + results = IntegrationTestUtils.waitUntilMinRecordsReceived(properties, topic, expected.size()); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + + if (results.size() != expected.size()) { + throw new AssertionError(printRecords(results) + " != " + expected); + } + final Iterator> expectedIterator = expected.iterator(); + for (final ConsumerRecord result : results) { + final KeyValueTimestamp expected1 = expectedIterator.next(); + try { + compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp()); + } catch (final AssertionError e) { + throw new AssertionError(printRecords(results) + " != " + expected, e); + } + } + } + + private void compareKeyValueTimestamp(final ConsumerRecord record, final K expectedKey, final V expectedValue, final long expectedTimestamp) { + Objects.requireNonNull(record); + final K recordKey = record.key(); + final V recordValue = record.value(); + final long recordTimestamp = record.timestamp(); + final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp + + " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp); + if (recordKey != null) { + if (!recordKey.equals(expectedKey)) { + throw error; + } + } else if (expectedKey != null) { + throw error; + } + if (recordValue != null) { + if (!recordValue.equals(expectedValue)) { + throw error; + } + } else if (expectedValue != null) { + throw error; + } + if (recordTimestamp != expectedTimestamp) { + throw error; + } + } + + private String printRecords(final List> result) { + final StringBuilder resultStr = new StringBuilder(); + resultStr.append("[\n"); + for (final ConsumerRecord record : result) { + resultStr.append(" ").append(record.toString()).append("\n"); + } + resultStr.append("]"); + return resultStr.toString(); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java new file mode 100644 index 00000000000..53f24b58aac --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl; +import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; +import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; +import org.junit.Test; + +import static java.lang.Long.MAX_VALUE; +import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; +import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class SuppressedTest { + + @Test + public void bufferBuilderShouldBeConsistent() { + assertThat( + "noBound should remove bounds", + maxBytes(2L).withMaxRecords(4L).withNoBound(), + is(unbounded()) + ); + + assertThat( + "keys alone should be set", + maxRecords(2L), + is(new EagerBufferConfigImpl(2L, MAX_VALUE)) + ); + + assertThat( + "size alone should be set", + maxBytes(2L), + is(new EagerBufferConfigImpl(MAX_VALUE, 2L)) + ); + } + + @Test + public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() { + assertThat( + "time alone should be set", + untilTimeLimit(ofMillis(2), unbounded()), + is(new SuppressedImpl<>(ofMillis(2), unbounded(), null)) + ); + + assertThat( + "time and unbounded buffer should be set", + untilTimeLimit(ofMillis(2), unbounded()), + is(new SuppressedImpl<>(ofMillis(2), unbounded(), null)) + ); + + assertThat( + "time and keys buffer should be set", + untilTimeLimit(ofMillis(2), maxRecords(2)), + is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null)) + ); + + assertThat( + "time and size buffer should be set", + untilTimeLimit(ofMillis(2), maxBytes(2)), + is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null)) + ); + + assertThat( + "all constraints should be set", + untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)), + is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null)) + ); + } + + @Test + public void finalEventsShouldAcceptStrictBuffersAndSetBounds() { + + assertThat( + untilWindowCloses(unbounded()), + is(new FinalResultsSuppressionBuilder<>(unbounded())) + ); + + assertThat( + untilWindowCloses(maxRecords(2L).shutDownWhenFull()), + is(new FinalResultsSuppressionBuilder<>(new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN)) + ) + ); + + assertThat( + untilWindowCloses(maxBytes(2L).shutDownWhenFull()), + is(new FinalResultsSuppressionBuilder<>(new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN)) + ) + ); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java new file mode 100644 index 00000000000..fead6788eb4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.streams.test.OutputVerifier; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Properties; + +import static java.time.Duration.ZERO; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; + +public class SuppressScenarioTest { + private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); + private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); + private static final Serde STRING_SERDE = Serdes.String(); + private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); + + @Test + public void shouldImmediatelyEmitEventsWithZeroEmitAfter() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KTable valueCounts = builder + .table( + "input", + Consumed.with(STRING_SERDE, STRING_SERDE), + Materialized.>with(STRING_SERDE, STRING_SERDE) + .withCachingDisabled() + .withLoggingDisabled() + ) + .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) + .count(); + + valueCounts + .suppress(untilTimeLimit(ZERO, unbounded())) + .toStream() + .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); + + final Topology topology = builder.build(); + + final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); + + final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); + driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("v1", 1L, 0L), + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L), + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("v1", 1L, 0L), + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L), + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + driver.pipeInput(recordFactory.create("input", "x", "x", 3L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + new KeyValueTimestamp<>("x", 1L, 3L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + new KeyValueTimestamp<>("x", 1L, 3L) + ) + ); + driver.pipeInput(recordFactory.create("input", "x", "x", 4L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("x", 0L, 4L), + new KeyValueTimestamp<>("x", 1L, 4L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("x", 0L, 4L), + new KeyValueTimestamp<>("x", 1L, 4L) + ) + ); + } + } + + private void verify(final List> results, final List> expectedResults) { + if (results.size() != expectedResults.size()) { + throw new AssertionError(printRecords(results) + " != " + expectedResults); + } + final Iterator> expectedIterator = expectedResults.iterator(); + for (final ProducerRecord result : results) { + final KeyValueTimestamp expected = expectedIterator.next(); + try { + OutputVerifier.compareKeyValueTimestamp(result, expected.key(), expected.value(), expected.timestamp()); + } catch (final AssertionError e) { + throw new AssertionError(printRecords(results) + " != " + expectedResults, e); + } + } + } + + private List> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer keyDeserializer, final Deserializer valueDeserializer) { + final List> result = new LinkedList<>(); + for (ProducerRecord next = driver.readOutput(topic, keyDeserializer, valueDeserializer); + next != null; + next = driver.readOutput(topic, keyDeserializer, valueDeserializer)) { + result.add(next); + } + return new ArrayList<>(result); + } + + private String printRecords(final List> result) { + final StringBuilder resultStr = new StringBuilder(); + resultStr.append("[\n"); + for (final ProducerRecord record : result) { + resultStr.append(" ").append(record.toString()).append("\n"); + } + resultStr.append("]"); + return resultStr.toString(); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java new file mode 100644 index 00000000000..2b054230839 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate; +import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public class GraphGraceSearchUtilTest { + @Test + public void shouldThrowOnNull() { + try { + GraphGraceSearchUtil.findAndVerifyWindowGrace(null); + fail("Should have thrown."); + } catch (final TopologyException e) { + assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [].")); + } + } + + @Test + public void shouldFailIfThereIsNoGraceAncestor() { + // doesn't matter if this ancestor is stateless or stateful. The important thing it that there is + // no grace period defined on any ancestor of the node + final StatefulProcessorNode gracelessAncestor = new StatefulProcessorNode<>( + "stateful", + new ProcessorParameters<>( + () -> new Processor() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }, + "dummy" + ), + null, + null, + false + ); + + final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); + gracelessAncestor.addChild(node); + + try { + GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + fail("should have thrown."); + } catch (final TopologyException e) { + assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless].")); + } + } + + @Test + public void shouldExtractGraceFromKStreamWindowAggregateNode() { + final TimeWindows windows = TimeWindows.of(10L).grace(1234L); + final StatefulProcessorNode node = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamWindowAggregate( + windows, + "asdf", + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(windows.gracePeriodMs())); + } + + @Test + public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { + final SessionWindows windows = SessionWindows.with(10L).grace(1234L); + + final StatefulProcessorNode node = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamSessionWindowAggregate( + windows, + "asdf", + null, + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(windows.gracePeriodMs())); + } + + @Test + public void shouldExtractGraceFromAncestorThroughStatefulParent() { + final SessionWindows windows = SessionWindows.with(10L).grace(1234L); + final StatefulProcessorNode graceGrandparent = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>(new KStreamSessionWindowAggregate( + windows, "asdf", null, null, null + ), "asdf"), + null, + null, + false + ); + + final StatefulProcessorNode statefulParent = new StatefulProcessorNode<>( + "stateful", + new ProcessorParameters<>( + () -> new Processor() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }, + "dummy" + ), + null, + null, + false + ); + graceGrandparent.addChild(statefulParent); + + final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); + statefulParent.addChild(node); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(windows.gracePeriodMs())); + } + + @Test + public void shouldExtractGraceFromAncestorThroughStatelessParent() { + final SessionWindows windows = SessionWindows.with(10L).grace(1234L); + final StatefulProcessorNode graceGrandparent = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamSessionWindowAggregate( + windows, + "asdf", + null, + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final ProcessorGraphNode statelessParent = new ProcessorGraphNode<>("stateless", null); + graceGrandparent.addChild(statelessParent); + + final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); + statelessParent.addChild(node); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(windows.gracePeriodMs())); + } + + @Test + public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { + final StatefulProcessorNode leftParent = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamSessionWindowAggregate( + SessionWindows.with(10L).grace(1234L), + "asdf", + null, + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final StatefulProcessorNode rightParent = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamWindowAggregate( + TimeWindows.of(10L).grace(4321L), + "asdf", + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final ProcessorGraphNode node = new ProcessorGraphNode<>("stateless", null); + leftParent.addChild(node); + rightParent.addChild(node); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(4321L)); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java new file mode 100644 index 00000000000..466033316c7 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.test.MockInternalProcessorContext; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Test; + +import java.time.Duration; +import java.util.Collection; + +import static java.time.Duration.ZERO; +import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@SuppressWarnings("PointlessArithmeticExpression") +public class KTableSuppressProcessorTest { + /** + * Use this value to indicate that the test correctness does not depend on any particular number + */ + private static final long ARBITRARY_LONG = 5L; + + /** + * Use this value to indicate that the test correctness does not depend on any particular window + */ + private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 100L); + + @Test + public void zeroTimeLimitShouldImmediatelyEmit() { + final KTableSuppressProcessor processor = + new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded()))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = ARBITRARY_LONG; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final String key = "hey"; + final Change value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @Test + public void windowedZeroTimeLimitShouldImmediatelyEmit() { + final KTableSuppressProcessor, Long> processor = + new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded()))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = ARBITRARY_LONG; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final Windowed key = new Windowed<>("hey", ARBITRARY_WINDOW); + final Change value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @Test + public void intermediateSuppressionShouldThrow() { + final KTableSuppressProcessor processor = + new KTableSuppressProcessor<>(getImpl(untilTimeLimit(Duration.ofMillis(1), unbounded()))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + try { + processor.process("hey", new Change<>(null, 1L)); + fail("expected an exception for now"); + } catch (final KTableSuppressProcessor.NotImplementedException e) { + // expected + } + assertThat(context.forwarded(), hasSize(0)); + } + + + @SuppressWarnings("unchecked") + private SuppressedImpl finalResults(final Duration grace) { + return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace); + } + + + @Test + public void finalResultsSuppressionShouldThrow() { + final KTableSuppressProcessor, Long> processor = + new KTableSuppressProcessor<>(finalResults(ofMillis(1))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + context.setTimestamp(ARBITRARY_LONG); + try { + processor.process(new Windowed<>("hey", ARBITRARY_WINDOW), new Change<>(ARBITRARY_LONG, ARBITRARY_LONG)); + fail("expected an exception for now"); + } catch (final KTableSuppressProcessor.NotImplementedException e) { + // expected + } + assertThat(context.forwarded(), hasSize(0)); + } + + @Test + public void finalResultsWith0GraceBeforeWindowEndShouldThrow() { + final KTableSuppressProcessor, Long> processor = + new KTableSuppressProcessor<>(finalResults(ofMillis(0))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 5L; + context.setTimestamp(timestamp); + final Windowed key = new Windowed<>("hey", new TimeWindow(0, 100L)); + final Change value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + try { + processor.process(key, value); + fail("expected an exception"); + } catch (final KTableSuppressProcessor.NotImplementedException e) { + // expected + } + assertThat(context.forwarded(), hasSize(0)); + } + + @Test + public void finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() { + final KTableSuppressProcessor, Long> processor = + new KTableSuppressProcessor<>(finalResults(ofMillis(0))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final Windowed key = new Windowed<>("hey", new TimeWindow(0, 100L)); + final Change value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + private static Matcher> hasSize(final int i) { + return new BaseMatcher>() { + @Override + public void describeTo(final Description description) { + description.appendText("a collection of size " + i); + } + + @SuppressWarnings("unchecked") + @Override + public boolean matches(final Object item) { + if (item == null) { + return false; + } else { + return ((Collection) item).size() == i; + } + } + + }; + } + + private static SuppressedImpl getImpl(final Suppressed suppressed) { + return (SuppressedImpl) suppressed; + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java new file mode 100644 index 00000000000..14f8561030f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.ThreadCache; + +public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext { + private ProcessorNode currentNode; + private long streamTime; + + @Override + public StreamsMetricsImpl metrics() { + return (StreamsMetricsImpl) super.metrics(); + } + + @Override + public ProcessorRecordContext recordContext() { + return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers()); + } + + @Override + public void setRecordContext(final ProcessorRecordContext recordContext) { + setRecordMetadata( + recordContext.topic(), + recordContext.partition(), + recordContext.offset(), + recordContext.headers(), + recordContext.timestamp() + ); + } + + @Override + public void setCurrentNode(final ProcessorNode currentNode) { + this.currentNode = currentNode; + } + + @Override + public ProcessorNode currentNode() { + return currentNode; + } + + @Override + public ThreadCache getCache() { + return null; + } + + @Override + public void initialize() { + + } + + @Override + public void uninitialize() { + + } + + @Override + public long streamTime() { + return streamTime; + } + + public void setStreamTime(final long streamTime) { + this.streamTime = streamTime; + } +} \ No newline at end of file diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index cba02573b59..dc854b0a5f5 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -405,13 +405,18 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S @SuppressWarnings("unchecked") @Override public void forward(final K key, final V value) { - capturedForwards.add(new CapturedForward(To.all(), new KeyValue(key, value))); + forward(key, value, To.all()); } @SuppressWarnings("unchecked") @Override public void forward(final K key, final V value, final To to) { - capturedForwards.add(new CapturedForward(to, new KeyValue(key, value))); + capturedForwards.add( + new CapturedForward( + to.timestamp == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to, + new KeyValue(key, value) + ) + ); } @SuppressWarnings("deprecation")