KAFKA-7223: Suppress API with only immediate emit (#5567)

Part 1 of the suppression API.

* add the DSL suppress method and config objects
* add the processor, but only in "identity" mode (i.e., it will forward only if the suppression spec says to forward immediately)
* add tests

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
John Roesler 2018-09-24 15:27:39 -05:00 committed by Guozhang Wang
parent 86b4d9d58f
commit 057c5307e0
21 changed files with 1944 additions and 20 deletions

View File

@ -389,6 +389,16 @@ public interface KTable<K, V> {
*/
<KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
/**
* Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration.
*
* This controls what updates downstream table and stream operations will receive.
*
* @param suppressed Configuration object determining what, if any, updates to suppress
* @return A new KTable with the desired suppression characteristics.
*/
KTable<K, V> suppress(final Suppressed<K> suppressed);
/**
* Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value
* (with possibly a new type), with default serializers, deserializers, and state store.

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
import java.time.Duration;
public interface Suppressed<K> {
/**
* Marker interface for a buffer configuration that is "strict" in the sense that it will strictly
* enforce the time bound and never emit early.
*/
interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> {
}
interface BufferConfig<BC extends BufferConfig<BC>> {
/**
* Create a size-constrained buffer in terms of the maximum number of keys it will store.
*/
static BufferConfig<?> maxRecords(final long recordLimit) {
return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
}
/**
* Set a size constraint on the buffer in terms of the maximum number of keys it will store.
*/
BC withMaxRecords(final long recordLimit);
/**
* Create a size-constrained buffer in terms of the maximum number of bytes it will use.
*/
static BufferConfig<?> maxBytes(final long byteLimit) {
return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
}
/**
* Set a size constraint on the buffer, the maximum number of bytes it will use.
*/
BC withMaxBytes(final long byteLimit);
/**
* Create a buffer unconstrained by size (either keys or bytes).
*
* As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
*
* If there isn't enough heap available to meet the demand, the application will encounter an
* {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
* JVM processes under extreme memory pressure may exhibit poor GC behavior.
*
* This is a convenient option if you doubt that your buffer will be that large, but also don't
* wish to pick particular constraints, such as in testing.
*
* This buffer is "strict" in the sense that it will enforce the time bound or crash.
* It will never emit early.
*/
static StrictBufferConfig unbounded() {
return new StrictBufferConfigImpl();
}
/**
* Set the buffer to be unconstrained by size (either keys or bytes).
*
* As a result, the buffer will consume as much memory as it needs, dictated by the time bound.
*
* If there isn't enough heap available to meet the demand, the application will encounter an
* {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that
* JVM processes under extreme memory pressure may exhibit poor GC behavior.
*
* This is a convenient option if you doubt that your buffer will be that large, but also don't
* wish to pick particular constraints, such as in testing.
*
* This buffer is "strict" in the sense that it will enforce the time bound or crash.
* It will never emit early.
*/
StrictBufferConfig withNoBound();
/**
* Set the buffer to gracefully shut down the application when any of its constraints are violated
*
* This buffer is "strict" in the sense that it will enforce the time bound or shut down.
* It will never emit early.
*/
StrictBufferConfig shutDownWhenFull();
/**
* Sets the buffer to use on-disk storage if it requires more memory than the constraints allow.
*
* This buffer is "strict" in the sense that it will never emit early.
*/
StrictBufferConfig spillToDiskWhenFull();
/**
* Set the buffer to just emit the oldest records when any of its constraints are violated.
*
* This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing
* duplicate results downstream, but does not promise to eliminate them.
*/
BufferConfig emitEarlyWhenFull();
}
/**
* Configure the suppression to emit only the "final results" from the window.
*
* By default all Streams operators emit results whenever new results are available.
* This includes windowed operations.
*
* This configuration will instead emit just one result per key for each window, guaranteeing
* to deliver only the final result. This option is suitable for use cases in which the business logic
* requires a hard guarantee that only the final result is propagated. For example, sending alerts.
*
* To accomplish this, the operator will buffer events from the window until the window close (that is,
* until the end-time passes, and additionally until the grace period expires). Since windowed operators
* are required to reject late events for a window whose grace period is expired, there is an additional
* guarantee that the final results emitted from this suppression will match any queriable state upstream.
*
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
* This is required to be a "strict" config, since it would violate the "final results"
* property to emit early and then issue an update later.
* @param <K> The key type for the KTable to apply this suppression to. "Final results" mode is only available
* on Windowed KTables (this is enforced by the type parameter).
* @return a "final results" mode suppression configuration
*/
static <K extends Windowed> Suppressed<K> untilWindowCloses(final StrictBufferConfig bufferConfig) {
return new FinalResultsSuppressionBuilder<>(bufferConfig);
}
/**
* Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record
* before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces
* the first record in the buffer but does <em>not</em> re-start the timer.
*
* @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events.
* @param bufferConfig A configuration specifying how much space to use for buffering intermediate results.
* @param <K> The key type for the KTable to apply this suppression to.
* @return a suppression configuration
*/
static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null);
}
}

View File

@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
private final String storeName;
@ -49,11 +49,11 @@ class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSup
private boolean sendOldValues = false;
KStreamSessionWindowAggregate(final SessionWindows windows,
final String storeName,
final Initializer<Agg> initializer,
final Aggregator<? super K, ? super V, Agg> aggregator,
final Merger<? super K, Agg> sessionMerger) {
public KStreamSessionWindowAggregate(final SessionWindows windows,
final String storeName,
final Initializer<Agg> initializer,
final Aggregator<? super K, ? super V, Agg> aggregator,
final Merger<? super K, Agg> sessionMerger) {
this.windows = windows;
this.storeName = storeName;
this.initializer = initializer;
@ -66,6 +66,10 @@ class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSup
return new KStreamSessionWindowAggregateProcessor();
}
public SessionWindows windows() {
return windows;
}
@Override
public void enableSendingOldValues() {
sendOldValues = true;

View File

@ -44,10 +44,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
private boolean sendOldValues = false;
KStreamWindowAggregate(final Windows<W> windows,
final String storeName,
final Initializer<Agg> initializer,
final Aggregator<? super K, ? super V, Agg> aggregator) {
public KStreamWindowAggregate(final Windows<W> windows,
final String storeName,
final Initializer<Agg> initializer,
final Aggregator<? super K, ? super V, Agg> aggregator) {
this.windows = windows;
this.storeName = storeName;
this.initializer = initializer;
@ -59,6 +59,10 @@ public class KStreamWindowAggregate<K, V, Agg, W extends Window> implements KStr
return new KStreamWindowAggregateProcessor();
}
public Windows<W> windows() {
return windows;
}
@Override
public void enableSendingOldValues() {
sendOldValues = true;

View File

@ -26,21 +26,30 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace;
/**
* The implementation class of {@link KTable}.
*
@ -66,6 +75,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String SELECT_NAME = "KTABLE-SELECT-";
private static final String SUPPRESS_NAME = "KTABLE-SUPPRESS-";
private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-";
@ -349,6 +360,53 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
return toStream().selectKey(mapper);
}
@Override
public KTable<K, V> suppress(final Suppressed<K> suppressed) {
final String name = builder.newProcessorName(SUPPRESS_NAME);
final ProcessorSupplier<K, Change<V>> suppressionSupplier =
() -> new KTableSuppressProcessor<>(buildSuppress(suppressed));
final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(
suppressionSupplier,
name
);
final ProcessorGraphNode<K, Change<V>> node = new ProcessorGraphNode<>(name, processorParameters, false);
builder.addGraphNode(streamsGraphNode, node);
return new KTableImpl<K, S, V>(
builder,
name,
suppressionSupplier,
keySerde,
valSerde,
Collections.singleton(this.name),
null,
false,
node
);
}
@SuppressWarnings("unchecked")
private SuppressedImpl<K> buildSuppress(final Suppressed<K> suppress) {
if (suppress instanceof FinalResultsSuppressionBuilder) {
final long grace = findAndVerifyWindowGrace(streamsGraphNode);
final FinalResultsSuppressionBuilder<?> builder = (FinalResultsSuppressionBuilder) suppress;
final SuppressedImpl<? extends Windowed> finalResultsSuppression =
builder.buildFinalResultsSuppression(Duration.ofMillis(grace));
return (SuppressedImpl<K>) finalResultsSuppression;
} else if (suppress instanceof SuppressedImpl) {
return (SuppressedImpl<K>) suppress;
} else {
throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed.");
}
}
@Override
public <V1, R> KTable<K, R> join(final KTable<K, V1> other,
final ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
@ -492,12 +550,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName);
kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters)
.withJoinOtherProcessorParameters(joinOtherProcessorParameters)
.withJoinThisProcessorParameters(joinThisProcessorParameters)
.withJoinThisStoreNames(valueGetterSupplier().storeNames())
.withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
.withOtherJoinSideNodeName(((KTableImpl) other).name)
.withThisJoinSideNodeName(name);
.withJoinOtherProcessorParameters(joinOtherProcessorParameters)
.withJoinThisProcessorParameters(joinThisProcessorParameters)
.withJoinThisStoreNames(valueGetterSupplier().storeNames())
.withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames())
.withOtherJoinSideNodeName(((KTableImpl) other).name)
.withThisJoinSideNodeName(name);
final KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build();
builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode);
@ -526,10 +584,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
final String selectName = builder.newProcessorName(SELECT_NAME);
final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
final ProcessorParameters processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier, selectName);
// select the aggregate key and values (old and new), it would require parent to send old values
final ProcessorGraphNode<K1, V1> groupByMapNode = new ProcessorGraphNode<>(
final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(
selectName,
processorParameters,
false

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.processor.ProcessorSupplier;
public final class GraphGraceSearchUtil {
private GraphGraceSearchUtil() {}
public static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraphNode) {
return findAndVerifyWindowGrace(streamsGraphNode, "");
}
private static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraphNode, final String chain) {
// error base case: we traversed off the end of the graph without finding a window definition
if (streamsGraphNode == null) {
throw new TopologyException(
"Window close time is only defined for windowed computations. Got [" + chain + "]."
);
}
// base case: return if this node defines a grace period.
{
final Long gracePeriod = extractGracePeriod(streamsGraphNode);
if (gracePeriod != null) {
return gracePeriod;
}
}
final String newChain = chain.equals("") ? streamsGraphNode.nodeName() : streamsGraphNode.nodeName() + "->" + chain;
if (streamsGraphNode.parentNodes().isEmpty()) {
// error base case: we traversed to the end of the graph without finding a window definition
throw new TopologyException(
"Window close time is only defined for windowed computations. Got [" + newChain + "]."
);
}
// recursive case: all parents must define a grace period, and we use the max of our parents' graces.
long inheritedGrace = -1;
for (final StreamsGraphNode parentNode : streamsGraphNode.parentNodes()) {
final long parentGrace = findAndVerifyWindowGrace(parentNode, newChain);
inheritedGrace = Math.max(inheritedGrace, parentGrace);
}
if (inheritedGrace == -1) {
throw new IllegalStateException(); // shouldn't happen, and it's not a legal grace period
}
return inheritedGrace;
}
private static Long extractGracePeriod(final StreamsGraphNode node) {
if (node instanceof StatefulProcessorNode) {
final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier();
if (processorSupplier instanceof KStreamWindowAggregate) {
final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier;
final Windows windows = kStreamWindowAggregate.windows();
return windows.gracePeriodMs();
} else if (processorSupplier instanceof KStreamSessionWindowAggregate) {
final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier;
final SessionWindows windows = kStreamSessionWindowAggregate.windows();
return windows.gracePeriodMs();
} else {
return null;
}
} else {
return null;
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.kstream.Suppressed;
import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
abstract class BufferConfigImpl<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> {
public abstract long maxKeys();
public abstract long maxBytes();
@SuppressWarnings("unused")
public abstract BufferFullStrategy bufferFullStrategy();
@Override
public Suppressed.StrictBufferConfig withNoBound() {
return new StrictBufferConfigImpl(
Long.MAX_VALUE,
Long.MAX_VALUE,
SHUT_DOWN // doesn't matter, given the bounds
);
}
@Override
public Suppressed.StrictBufferConfig shutDownWhenFull() {
return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN);
}
@Override
public Suppressed.BufferConfig emitEarlyWhenFull() {
return new EagerBufferConfigImpl(maxKeys(), maxBytes());
}
@Override
public Suppressed.StrictBufferConfig spillToDiskWhenFull() {
throw new UnsupportedOperationException("not implemented");
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.suppress;
public enum BufferFullStrategy {
EMIT,
SPILL_TO_DISK,
SHUT_DOWN
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.kstream.Suppressed;
import java.util.Objects;
public class EagerBufferConfigImpl extends BufferConfigImpl {
private final long maxKeys;
private final long maxBytes;
public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) {
this.maxKeys = maxKeys;
this.maxBytes = maxBytes;
}
@Override
public Suppressed.BufferConfig withMaxRecords(final long recordLimit) {
return new EagerBufferConfigImpl(recordLimit, maxBytes);
}
@Override
public Suppressed.BufferConfig withMaxBytes(final long byteLimit) {
return new EagerBufferConfigImpl(maxKeys, byteLimit);
}
@Override
public long maxKeys() {
return maxKeys;
}
@Override
public long maxBytes() {
return maxBytes;
}
@Override
public BufferFullStrategy bufferFullStrategy() {
return BufferFullStrategy.EMIT;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o;
return maxKeys == that.maxKeys &&
maxBytes == that.maxBytes;
}
@Override
public int hashCode() {
return Objects.hash(maxKeys, maxBytes);
}
@Override
public String toString() {
return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + maxBytes + '}';
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.time.Duration;
import java.util.Objects;
public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppressed<K> {
private final StrictBufferConfig bufferConfig;
public FinalResultsSuppressionBuilder(final Suppressed.StrictBufferConfig bufferConfig) {
this.bufferConfig = bufferConfig;
}
public SuppressedImpl<K> buildFinalResultsSuppression(final Duration gracePeriod) {
return new SuppressedImpl<>(
gracePeriod,
bufferConfig,
(ProcessorContext context, K key) -> key.window().end()
);
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final FinalResultsSuppressionBuilder<?> that = (FinalResultsSuppressionBuilder<?>) o;
return Objects.equals(bufferConfig, that.bufferConfig);
}
@Override
public int hashCode() {
return Objects.hash(bufferConfig);
}
@Override
public String toString() {
return "FinalResultsSuppressionBuilder{bufferConfig=" + bufferConfig + '}';
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import java.time.Duration;
public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
private final SuppressedImpl<K> suppress;
private InternalProcessorContext internalProcessorContext;
public KTableSuppressProcessor(final SuppressedImpl<K> suppress) {
this.suppress = suppress;
}
@Override
public void init(final ProcessorContext context) {
internalProcessorContext = (InternalProcessorContext) context;
}
@Override
public void process(final K key, final Change<V> value) {
if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && definedRecordTime(key) <= internalProcessorContext.streamTime()) {
internalProcessorContext.forward(key, value);
} else {
throw new NotImplementedException();
}
}
private long definedRecordTime(final K key) {
return suppress.getTimeDefinition().time(internalProcessorContext, key);
}
@Override
public void close() {
}
@Override
public String toString() {
return "KTableSuppressProcessor{suppress=" + suppress + '}';
}
static class NotImplementedException extends RuntimeException {
NotImplementedException() {
super();
}
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.kstream.Suppressed;
import java.util.Objects;
import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
public class StrictBufferConfigImpl extends BufferConfigImpl<Suppressed.StrictBufferConfig> implements Suppressed.StrictBufferConfig {
private final long maxKeys;
private final long maxBytes;
private final BufferFullStrategy bufferFullStrategy;
public StrictBufferConfigImpl(final long maxKeys,
final long maxBytes,
final BufferFullStrategy bufferFullStrategy) {
this.maxKeys = maxKeys;
this.maxBytes = maxBytes;
this.bufferFullStrategy = bufferFullStrategy;
}
public StrictBufferConfigImpl() {
this.maxKeys = Long.MAX_VALUE;
this.maxBytes = Long.MAX_VALUE;
this.bufferFullStrategy = SHUT_DOWN;
}
@Override
public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) {
return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy);
}
@Override
public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) {
return new StrictBufferConfigImpl(maxKeys, byteLimit, bufferFullStrategy);
}
@Override
public long maxKeys() {
return maxKeys;
}
@Override
public long maxBytes() {
return maxBytes;
}
@Override
public BufferFullStrategy bufferFullStrategy() {
return bufferFullStrategy;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o;
return maxKeys == that.maxKeys &&
maxBytes == that.maxBytes &&
bufferFullStrategy == that.bufferFullStrategy;
}
@Override
public int hashCode() {
return Objects.hash(maxKeys, maxBytes, bufferFullStrategy);
}
@Override
public String toString() {
return "StrictBufferConfigImpl{maxKeys=" + maxKeys +
", maxBytes=" + maxBytes +
", bufferFullStrategy=" + bufferFullStrategy + '}';
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.processor.ProcessorContext;
import java.time.Duration;
import java.util.Objects;
public class SuppressedImpl<K> implements Suppressed<K> {
private static final Duration DEFAULT_SUPPRESSION_TIME = Duration.ofMillis(Long.MAX_VALUE);
private static final StrictBufferConfig DEFAULT_BUFFER_CONFIG = BufferConfig.unbounded();
private final BufferConfig bufferConfig;
private final Duration timeToWaitForMoreEvents;
private final TimeDefinition<K> timeDefinition;
public SuppressedImpl(final Duration suppressionTime,
final BufferConfig bufferConfig,
final TimeDefinition<K> timeDefinition) {
this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime;
this.timeDefinition = timeDefinition == null ? (context, anyKey) -> context.timestamp() : timeDefinition;
this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : bufferConfig;
}
interface TimeDefinition<K> {
long time(final ProcessorContext context, final K key);
}
TimeDefinition<K> getTimeDefinition() {
return timeDefinition;
}
Duration getTimeToWaitForMoreEvents() {
return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final SuppressedImpl<?> that = (SuppressedImpl<?>) o;
return Objects.equals(bufferConfig, that.bufferConfig) &&
Objects.equals(getTimeToWaitForMoreEvents(), that.getTimeToWaitForMoreEvents()) &&
Objects.equals(getTimeDefinition(), that.getTimeDefinition());
}
@Override
public int hashCode() {
return Objects.hash(bufferConfig, getTimeToWaitForMoreEvents(), getTimeDefinition());
}
@Override
public String toString() {
return "SuppressedImpl{" +
", bufferConfig=" + bufferConfig +
", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
", timeDefinition=" + timeDefinition +
'}';
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams;
public class KeyValueTimestamp<K, V> {
private final K key;
private final V value;
private final long timestamp;
public KeyValueTimestamp(final K key, final V value, final long timestamp) {
this.key = key;
this.value = value;
this.timestamp = timestamp;
}
public K key() {
return key;
}
public V value() {
return value;
}
public long timestamp() {
return timestamp;
}
@Override
public String toString() {
return "KeyValueTimestamp{key=" + key + ", value=" + value + ", timestamp=" + timestamp + '}';
}
}

View File

@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.time.Duration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
@Category({IntegrationTest.class})
public class SuppressionIntegrationTest {
@ClassRule
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
private static final int COMMIT_INTERVAL = 100;
private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2;
@Test
public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws InterruptedException {
final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
final String input = "input" + testId;
final String outputSuppressed = "output-suppressed" + testId;
final String outputRaw = "output-raw" + testId;
cleanStateBeforeTest(input, outputSuppressed, outputRaw);
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> valueCounts = builder
.table(
input,
Consumed.with(STRING_SERDE, STRING_SERDE),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE)
.withCachingDisabled()
.withLoggingDisabled()
)
.groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE))
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilTimeLimit(Duration.ZERO, unbounded()))
.toStream()
.to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
.to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
final KafkaStreams driver = getCleanStartedStreams(appId, builder);
try {
produceSynchronously(
input,
asList(
new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
new KeyValueTimestamp<>("x", "x", scaledTime(4L))
)
);
verifyOutput(
outputRaw,
asList(
new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
)
);
verifyOutput(
outputSuppressed,
asList(
new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
new KeyValueTimestamp<>("x", 1L, scaledTime(4L))
)
);
} finally {
driver.close();
cleanStateAfterTest(driver);
}
}
private void cleanStateBeforeTest(final String... topic) throws InterruptedException {
CLUSTER.deleteAllTopicsAndWait(30_000L);
for (final String s : topic) {
CLUSTER.createTopic(s, 1, 1);
}
}
private KafkaStreams getCleanStartedStreams(final String appId, final StreamsBuilder builder) {
final Properties streamsConfig = mkProperties(mkMap(
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.POLL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL))
));
final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig);
driver.cleanUp();
driver.start();
return driver;
}
private void cleanStateAfterTest(final KafkaStreams driver) throws InterruptedException {
driver.cleanUp();
CLUSTER.deleteAllTopicsAndWait(30_000L);
}
private long scaledTime(final long unscaledTime) {
return SCALE_FACTOR * unscaledTime;
}
private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) {
final Properties producerConfig = mkProperties(mkMap(
mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"),
mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()),
mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()),
mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers())
));
try (final Producer<String, String> producer = new KafkaProducer<>(producerConfig)) {
// TODO: test EOS
//noinspection ConstantConditions
if (false) {
producer.initTransactions();
producer.beginTransaction();
}
final LinkedList<Future<RecordMetadata>> futures = new LinkedList<>();
for (final KeyValueTimestamp<String, String> record : toProduce) {
final Future<RecordMetadata> f = producer.send(
new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null)
);
futures.add(f);
}
for (final Future<RecordMetadata> future : futures) {
try {
future.get();
} catch (final InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
// TODO: test EOS
//noinspection ConstantConditions
if (false) {
producer.commitTransaction();
} else {
producer.flush();
}
}
}
private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> expected) {
final List<ConsumerRecord<String, Long>> results;
try {
final Properties properties = mkProperties(
mkMap(
mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
)
);
results = IntegrationTestUtils.waitUntilMinRecordsReceived(properties, topic, expected.size());
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
if (results.size() != expected.size()) {
throw new AssertionError(printRecords(results) + " != " + expected);
}
final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator();
for (final ConsumerRecord<String, Long> result : results) {
final KeyValueTimestamp<String, Long> expected1 = expectedIterator.next();
try {
compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp());
} catch (final AssertionError e) {
throw new AssertionError(printRecords(results) + " != " + expected, e);
}
}
}
private <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record, final K expectedKey, final V expectedValue, final long expectedTimestamp) {
Objects.requireNonNull(record);
final K recordKey = record.key();
final V recordValue = record.value();
final long recordTimestamp = record.timestamp();
final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp +
" but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp);
if (recordKey != null) {
if (!recordKey.equals(expectedKey)) {
throw error;
}
} else if (expectedKey != null) {
throw error;
}
if (recordValue != null) {
if (!recordValue.equals(expectedValue)) {
throw error;
}
} else if (expectedValue != null) {
throw error;
}
if (recordTimestamp != expectedTimestamp) {
throw error;
}
}
private <K, V> String printRecords(final List<ConsumerRecord<K, V>> result) {
final StringBuilder resultStr = new StringBuilder();
resultStr.append("[\n");
for (final ConsumerRecord<?, ?> record : result) {
resultStr.append(" ").append(record.toString()).append("\n");
}
resultStr.append("]");
return resultStr.toString();
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
import org.junit.Test;
import static java.lang.Long.MAX_VALUE;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class SuppressedTest {
@Test
public void bufferBuilderShouldBeConsistent() {
assertThat(
"noBound should remove bounds",
maxBytes(2L).withMaxRecords(4L).withNoBound(),
is(unbounded())
);
assertThat(
"keys alone should be set",
maxRecords(2L),
is(new EagerBufferConfigImpl(2L, MAX_VALUE))
);
assertThat(
"size alone should be set",
maxBytes(2L),
is(new EagerBufferConfigImpl(MAX_VALUE, 2L))
);
}
@Test
public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() {
assertThat(
"time alone should be set",
untilTimeLimit(ofMillis(2), unbounded()),
is(new SuppressedImpl<>(ofMillis(2), unbounded(), null))
);
assertThat(
"time and unbounded buffer should be set",
untilTimeLimit(ofMillis(2), unbounded()),
is(new SuppressedImpl<>(ofMillis(2), unbounded(), null))
);
assertThat(
"time and keys buffer should be set",
untilTimeLimit(ofMillis(2), maxRecords(2)),
is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null))
);
assertThat(
"time and size buffer should be set",
untilTimeLimit(ofMillis(2), maxBytes(2)),
is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null))
);
assertThat(
"all constraints should be set",
untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null))
);
}
@Test
public void finalEventsShouldAcceptStrictBuffersAndSetBounds() {
assertThat(
untilWindowCloses(unbounded()),
is(new FinalResultsSuppressionBuilder<>(unbounded()))
);
assertThat(
untilWindowCloses(maxRecords(2L).shutDownWhenFull()),
is(new FinalResultsSuppressionBuilder<>(new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN))
)
);
assertThat(
untilWindowCloses(maxBytes(2L).shutDownWhenFull()),
is(new FinalResultsSuppressionBuilder<>(new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN))
)
);
}
}

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import static java.time.Duration.ZERO;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
public class SuppressScenarioTest {
private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
@Test
public void shouldImmediatelyEmitEventsWithZeroEmitAfter() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> valueCounts = builder
.table(
"input",
Consumed.with(STRING_SERDE, STRING_SERDE),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE)
.withCachingDisabled()
.withLoggingDisabled()
)
.groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE))
.count();
valueCounts
.suppress(untilTimeLimit(ZERO, unbounded()))
.toStream()
.to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
final Properties config = Utils.mkProperties(Utils.mkMap(
Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())),
Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
));
final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L));
verify(
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("v1", 1L, 0L),
new KeyValueTimestamp<>("v1", 0L, 1L),
new KeyValueTimestamp<>("v2", 1L, 1L),
new KeyValueTimestamp<>("v1", 1L, 2L)
)
);
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("v1", 1L, 0L),
new KeyValueTimestamp<>("v1", 0L, 1L),
new KeyValueTimestamp<>("v2", 1L, 1L),
new KeyValueTimestamp<>("v1", 1L, 2L)
)
);
driver.pipeInput(recordFactory.create("input", "x", "x", 3L));
verify(
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
singletonList(
new KeyValueTimestamp<>("x", 1L, 3L)
)
);
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
singletonList(
new KeyValueTimestamp<>("x", 1L, 3L)
)
);
driver.pipeInput(recordFactory.create("input", "x", "x", 4L));
verify(
drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("x", 0L, 4L),
new KeyValueTimestamp<>("x", 1L, 4L)
)
);
verify(
drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER),
asList(
new KeyValueTimestamp<>("x", 0L, 4L),
new KeyValueTimestamp<>("x", 1L, 4L)
)
);
}
}
private <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) {
if (results.size() != expectedResults.size()) {
throw new AssertionError(printRecords(results) + " != " + expectedResults);
}
final Iterator<KeyValueTimestamp<K, V>> expectedIterator = expectedResults.iterator();
for (final ProducerRecord<K, V> result : results) {
final KeyValueTimestamp<K, V> expected = expectedIterator.next();
try {
OutputVerifier.compareKeyValueTimestamp(result, expected.key(), expected.value(), expected.timestamp());
} catch (final AssertionError e) {
throw new AssertionError(printRecords(results) + " != " + expectedResults, e);
}
}
}
private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) {
final List<ProducerRecord<K, V>> result = new LinkedList<>();
for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
next != null;
next = driver.readOutput(topic, keyDeserializer, valueDeserializer)) {
result.add(next);
}
return new ArrayList<>(result);
}
private <K, V> String printRecords(final List<ProducerRecord<K, V>> result) {
final StringBuilder resultStr = new StringBuilder();
resultStr.append("[\n");
for (final ProducerRecord<?, ?> record : result) {
resultStr.append(" ").append(record.toString()).append("\n");
}
resultStr.append("]");
return resultStr.toString();
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public class GraphGraceSearchUtilTest {
@Test
public void shouldThrowOnNull() {
try {
GraphGraceSearchUtil.findAndVerifyWindowGrace(null);
fail("Should have thrown.");
} catch (final TopologyException e) {
assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got []."));
}
}
@Test
public void shouldFailIfThereIsNoGraceAncestor() {
// doesn't matter if this ancestor is stateless or stateful. The important thing it that there is
// no grace period defined on any ancestor of the node
final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>(
"stateful",
new ProcessorParameters<>(
() -> new Processor<Object, Object>() {
@Override
public void init(final ProcessorContext context) {}
@Override
public void process(final Object key, final Object value) {}
@Override
public void close() {}
},
"dummy"
),
null,
null,
false
);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
gracelessAncestor.addChild(node);
try {
GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
fail("should have thrown.");
} catch (final TopologyException e) {
assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless]."));
}
}
@Test
public void shouldExtractGraceFromKStreamWindowAggregateNode() {
final TimeWindows windows = TimeWindows.of(10L).grace(1234L);
final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
windows,
"asdf",
null,
null
),
"asdf"
),
null,
null,
false
);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
assertThat(extracted, is(windows.gracePeriodMs()));
}
@Test
public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() {
final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
windows,
"asdf",
null,
null,
null
),
"asdf"
),
null,
null,
false
);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
assertThat(extracted, is(windows.gracePeriodMs()));
}
@Test
public void shouldExtractGraceFromAncestorThroughStatefulParent() {
final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>(
windows, "asdf", null, null, null
), "asdf"),
null,
null,
false
);
final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>(
"stateful",
new ProcessorParameters<>(
() -> new Processor<Object, Object>() {
@Override
public void init(final ProcessorContext context) {}
@Override
public void process(final Object key, final Object value) {}
@Override
public void close() {}
},
"dummy"
),
null,
null,
false
);
graceGrandparent.addChild(statefulParent);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
statefulParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
assertThat(extracted, is(windows.gracePeriodMs()));
}
@Test
public void shouldExtractGraceFromAncestorThroughStatelessParent() {
final SessionWindows windows = SessionWindows.with(10L).grace(1234L);
final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
windows,
"asdf",
null,
null,
null
),
"asdf"
),
null,
null,
false
);
final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless", null);
graceGrandparent.addChild(statelessParent);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
statelessParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
assertThat(extracted, is(windows.gracePeriodMs()));
}
@Test
public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() {
final StatefulProcessorNode<String, Long> leftParent = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamSessionWindowAggregate<String, Long, Integer>(
SessionWindows.with(10L).grace(1234L),
"asdf",
null,
null,
null
),
"asdf"
),
null,
null,
false
);
final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>(
"asdf",
new ProcessorParameters<>(
new KStreamWindowAggregate<String, Long, Integer, TimeWindow>(
TimeWindows.of(10L).grace(4321L),
"asdf",
null,
null
),
"asdf"
),
null,
null,
false
);
final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null);
leftParent.addChild(node);
rightParent.addChild(node);
final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node);
assertThat(extracted, is(4321L));
}
}

View File

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.Test;
import java.time.Duration;
import java.util.Collection;
import static java.time.Duration.ZERO;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
@SuppressWarnings("PointlessArithmeticExpression")
public class KTableSuppressProcessorTest {
/**
* Use this value to indicate that the test correctness does not depend on any particular number
*/
private static final long ARBITRARY_LONG = 5L;
/**
* Use this value to indicate that the test correctness does not depend on any particular window
*/
private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 100L);
@Test
public void zeroTimeLimitShouldImmediatelyEmit() {
final KTableSuppressProcessor<String, Long> processor =
new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded())));
final MockInternalProcessorContext context = new MockInternalProcessorContext();
processor.init(context);
final long timestamp = ARBITRARY_LONG;
context.setTimestamp(timestamp);
context.setStreamTime(timestamp);
final String key = "hey";
final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG);
processor.process(key, value);
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
assertThat(capturedForward.timestamp(), is(timestamp));
}
@Test
public void windowedZeroTimeLimitShouldImmediatelyEmit() {
final KTableSuppressProcessor<Windowed<String>, Long> processor =
new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded())));
final MockInternalProcessorContext context = new MockInternalProcessorContext();
processor.init(context);
final long timestamp = ARBITRARY_LONG;
context.setTimestamp(timestamp);
context.setStreamTime(timestamp);
final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG);
processor.process(key, value);
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
assertThat(capturedForward.timestamp(), is(timestamp));
}
@Test
public void intermediateSuppressionShouldThrow() {
final KTableSuppressProcessor<String, Long> processor =
new KTableSuppressProcessor<>(getImpl(untilTimeLimit(Duration.ofMillis(1), unbounded())));
final MockInternalProcessorContext context = new MockInternalProcessorContext();
processor.init(context);
try {
processor.process("hey", new Change<>(null, 1L));
fail("expected an exception for now");
} catch (final KTableSuppressProcessor.NotImplementedException e) {
// expected
}
assertThat(context.forwarded(), hasSize(0));
}
@SuppressWarnings("unchecked")
private <K extends Windowed> SuppressedImpl<K> finalResults(final Duration grace) {
return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace);
}
@Test
public void finalResultsSuppressionShouldThrow() {
final KTableSuppressProcessor<Windowed<String>, Long> processor =
new KTableSuppressProcessor<>(finalResults(ofMillis(1)));
final MockInternalProcessorContext context = new MockInternalProcessorContext();
processor.init(context);
context.setTimestamp(ARBITRARY_LONG);
try {
processor.process(new Windowed<>("hey", ARBITRARY_WINDOW), new Change<>(ARBITRARY_LONG, ARBITRARY_LONG));
fail("expected an exception for now");
} catch (final KTableSuppressProcessor.NotImplementedException e) {
// expected
}
assertThat(context.forwarded(), hasSize(0));
}
@Test
public void finalResultsWith0GraceBeforeWindowEndShouldThrow() {
final KTableSuppressProcessor<Windowed<String>, Long> processor =
new KTableSuppressProcessor<>(finalResults(ofMillis(0)));
final MockInternalProcessorContext context = new MockInternalProcessorContext();
processor.init(context);
final long timestamp = 5L;
context.setTimestamp(timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG);
try {
processor.process(key, value);
fail("expected an exception");
} catch (final KTableSuppressProcessor.NotImplementedException e) {
// expected
}
assertThat(context.forwarded(), hasSize(0));
}
@Test
public void finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() {
final KTableSuppressProcessor<Windowed<String>, Long> processor =
new KTableSuppressProcessor<>(finalResults(ofMillis(0)));
final MockInternalProcessorContext context = new MockInternalProcessorContext();
processor.init(context);
final long timestamp = 100L;
context.setTimestamp(timestamp);
context.setStreamTime(timestamp);
final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L));
final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG);
processor.process(key, value);
assertThat(context.forwarded(), hasSize(1));
final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0);
assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
assertThat(capturedForward.timestamp(), is(timestamp));
}
private static <E> Matcher<Collection<E>> hasSize(final int i) {
return new BaseMatcher<Collection<E>>() {
@Override
public void describeTo(final Description description) {
description.appendText("a collection of size " + i);
}
@SuppressWarnings("unchecked")
@Override
public boolean matches(final Object item) {
if (item == null) {
return false;
} else {
return ((Collection<E>) item).size() == i;
}
}
};
}
private static <K> SuppressedImpl<K> getImpl(final Suppressed<K> suppressed) {
return (SuppressedImpl<K>) suppressed;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext {
private ProcessorNode currentNode;
private long streamTime;
@Override
public StreamsMetricsImpl metrics() {
return (StreamsMetricsImpl) super.metrics();
}
@Override
public ProcessorRecordContext recordContext() {
return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers());
}
@Override
public void setRecordContext(final ProcessorRecordContext recordContext) {
setRecordMetadata(
recordContext.topic(),
recordContext.partition(),
recordContext.offset(),
recordContext.headers(),
recordContext.timestamp()
);
}
@Override
public void setCurrentNode(final ProcessorNode currentNode) {
this.currentNode = currentNode;
}
@Override
public ProcessorNode currentNode() {
return currentNode;
}
@Override
public ThreadCache getCache() {
return null;
}
@Override
public void initialize() {
}
@Override
public void uninitialize() {
}
@Override
public long streamTime() {
return streamTime;
}
public void setStreamTime(final long streamTime) {
this.streamTime = streamTime;
}
}

View File

@ -405,13 +405,18 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(final K key, final V value) {
capturedForwards.add(new CapturedForward(To.all(), new KeyValue(key, value)));
forward(key, value, To.all());
}
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(final K key, final V value, final To to) {
capturedForwards.add(new CapturedForward(to, new KeyValue(key, value)));
capturedForwards.add(
new CapturedForward(
to.timestamp == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to,
new KeyValue(key, value)
)
);
}
@SuppressWarnings("deprecation")