KAFKA-7502: Cleanup KTable materialization logic in a single place (filter) (#6453)

This PR is a follow-up of #6174, which handles doFilter / doMapValues / doTransformValues methods.

Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Lee Dongjin 2019-03-29 05:57:04 +09:00 committed by Guozhang Wang
parent b42d904f81
commit d63d702252
3 changed files with 62 additions and 49 deletions

View File

@ -71,7 +71,8 @@ class KTableFilter<K, V> implements KTableProcessorSupplier<K, V, V> {
super.init(context);
if (queryableName != null) {
store = (KeyValueStore<K, V>) context.getStateStore(queryableName);
tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context), sendOldValues);
tupleForwarder = new TupleForwarder<>(store, context,
new ForwardingCacheFlushListener<>(context), sendOldValues);
}
}

View File

@ -114,17 +114,36 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal,
final boolean filterNot) {
// we actually do not need to generate store names at all since if it is not specified, we will not
// materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility.
if (materializedInternal != null && materializedInternal.storeName() == null) {
builder.newStoreName(FILTER_NAME);
final Serde<K> keySerde;
final Serde<V> valueSerde;
final String queryableStoreName;
final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
if (materializedInternal != null) {
// we actually do not need to generate store names at all since if it is not specified, we will not
// materialize the store; but we still need to burn one index BEFORE generating the processor to keep compatibility.
if (materializedInternal.storeName() == null) {
builder.newStoreName(FILTER_NAME);
}
// we can inherit parent key and value serde if user do not provide specific overrides, more specifically:
// we preserve the key following the order of 1) materialized, 2) parent
keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
// we preserve the value following the order of 1) materialized, 2) parent
valueSerde = materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : this.valSerde;
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
} else {
keySerde = this.keySerde;
valueSerde = this.valSerde;
queryableStoreName = null;
storeBuilder = null;
}
final String name = builder.newProcessorName(FILTER_NAME);
// only materialize if the state store has queryable name
final String queryableName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
final KTableProcessorSupplier<K, V, V> processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableName);
final KTableProcessorSupplier<K, V, V> processorSupplier =
new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
final ProcessorParameters<K, V> processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType(
new ProcessorParameters<>(processorSupplier, name)
@ -133,20 +152,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final StreamsGraphNode tableNode = new TableProcessorNode<>(
name,
processorParameters,
materializedInternal,
null
storeBuilder
);
builder.addGraphNode(this.streamsGraphNode, tableNode);
// we can inherit parent key and value serde if user do not provide specific overrides, more specifically:
// we preserve the key following the order of 1) materialized, 2) parent
// we preserve the value following the order of 1) materialized, 2) parent
return new KTableImpl<>(name,
materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
materializedInternal != null && materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : valSerde,
keySerde,
valueSerde,
sourceNodes,
queryableName,
queryableStoreName,
processorSupplier,
tableNode,
builder);
@ -195,8 +210,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final String name = builder.newProcessorName(MAPVALUES_NAME);
// only materialize if the state store has queryable name
final String queryableName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableName);
final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableMapValues<>(this, mapper, queryableStoreName);
// leaving in calls to ITB until building topology with graph
@ -206,8 +222,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final StreamsGraphNode tableNode = new TableProcessorNode<>(
name,
processorParameters,
materializedInternal,
null
storeBuilder
);
builder.addGraphNode(this.streamsGraphNode, tableNode);
@ -220,7 +235,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
materializedInternal != null ? materializedInternal.valueSerde() : null,
sourceNodes,
queryableName,
queryableStoreName,
processorSupplier,
tableNode,
builder
@ -277,14 +292,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
}
private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames");
final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
// only materialize if users provide a specific queryable name
final String queryableStoreName = materialized != null ? materialized.queryableStoreName() : null;
final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(
this,
@ -298,7 +314,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final StreamsGraphNode tableNode = new TableProcessorNode<>(
name,
processorParameters,
materialized,
storeBuilder,
stateStoreNames
);
@ -309,8 +325,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
// we preserve the value following the order of 1) materialized, 2) null
return new KTableImpl<>(
name,
materialized != null && materialized.keySerde() != null ? materialized.keySerde() : keySerde,
materialized != null ? materialized.valueSerde() : null,
materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
materializedInternal != null ? materializedInternal.valueSerde() : null,
sourceNodes,
queryableStoreName,
processorSupplier,

View File

@ -17,39 +17,41 @@
package org.apache.kafka.streams.kstream.internals.graph;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Arrays;
public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraphNode {
public class TableProcessorNode<K, V> extends StreamsGraphNode {
private final MaterializedInternal<K, V, S> materializedInternal;
private final ProcessorParameters<K, V> processorParameters;
private final StoreBuilder<KeyValueStore<K, V>> storeBuilder;
private final String[] storeNames;
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
final MaterializedInternal<K, V, S> materializedInternal,
final String[] storeNames) {
final StoreBuilder<KeyValueStore<K, V>> storeBuilder) {
this(nodeName, processorParameters, storeBuilder, null);
}
public TableProcessorNode(final String nodeName,
final ProcessorParameters<K, V> processorParameters,
final StoreBuilder<KeyValueStore<K, V>> storeBuilder,
final String[] storeNames) {
super(nodeName);
this.processorParameters = processorParameters;
this.materializedInternal = materializedInternal;
this.storeNames = storeNames != null ? storeNames : new String[]{};
this.storeBuilder = storeBuilder;
this.storeNames = storeNames != null ? storeNames : new String[] {};
}
@Override
public String toString() {
return "TableProcessorNode{" +
"materializedInternal=" + materializedInternal +
", processorParameters=" + processorParameters +
", storeNames=" + Arrays.toString(storeNames) +
"} " + super.toString();
", processorParameters=" + processorParameters +
", storeBuilder=" + storeBuilder.name() +
", storeNames=" + Arrays.toString(storeNames) +
"} " + super.toString();
}
@SuppressWarnings("unchecked")
@ -62,15 +64,9 @@ public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraph
topologyBuilder.connectProcessorAndStateStores(processorName, storeNames);
}
// only materialize if materialized is specified and it is queryable
final boolean shouldMaterialize = materializedInternal != null && materializedInternal.queryableStoreName() != null;
if (shouldMaterialize) {
// TODO: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
topologyBuilder.addStateStore(
new KeyValueStoreMaterializer<>(
(MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal
).materialize(),
processorName);
// TODO: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
if (storeBuilder != null) {
topologyBuilder.addStateStore(storeBuilder, processorName);
}
}
}