Cleanup KTableImpl#doTransformValues (#6519)

This PR is a follow-up of #6174 and #6453, which cleans up KTableImpl#doTransformValues method.

Reviewers: Bill Bejeck <bbejeck@gmail.com>
This commit is contained in:
Lee Dongjin 2019-03-30 06:08:20 +09:00 committed by Bill Bejeck
parent 17f73b42ad
commit d10023e8d3
1 changed files with 22 additions and 9 deletions

View File

@ -295,13 +295,29 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
final String... stateStoreNames) {
Objects.requireNonNull(stateStoreNames, "stateStoreNames");
final Serde<K> keySerde;
final Serde<VR> valueSerde;
final String queryableStoreName;
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder;
if (materializedInternal != null) {
// don't inherit parent value serde, since this operation may change the value type, more specifically:
// we preserve the key following the order of 1) materialized, 2) parent, 3) null
keySerde = materializedInternal.keySerde() != null ? materializedInternal.keySerde() : this.keySerde;
// we preserve the value following the order of 1) materialized, 2) null
valueSerde = materializedInternal.valueSerde();
queryableStoreName = materializedInternal.queryableStoreName();
// only materialize if materialized is specified and it has queryable name
storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
} else {
keySerde = this.keySerde;
valueSerde = null;
queryableStoreName = null;
storeBuilder = null;
}
final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
// only materialize if users provide a specific queryable name
final String queryableStoreName = materializedInternal != null ? materializedInternal.queryableStoreName() : null;
final StoreBuilder<KeyValueStore<K, VR>> storeBuilder = queryableStoreName != null ? (new KeyValueStoreMaterializer<>(materializedInternal)).materialize() : null;
final KTableProcessorSupplier<K, V, VR> processorSupplier = new KTableTransformValues<>(
this,
transformerSupplier,
@ -320,13 +336,10 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
builder.addGraphNode(this.streamsGraphNode, tableNode);
// don't inherit parent value serde, since this operation may change the value type, more specifically:
// we preserve the key following the order of 1) materialized, 2) parent, 3) null
// we preserve the value following the order of 1) materialized, 2) null
return new KTableImpl<>(
name,
materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde,
materializedInternal != null ? materializedInternal.valueSerde() : null,
keySerde,
valueSerde,
sourceNodes,
queryableStoreName,
processorSupplier,