[KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461)

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Ted Yu 2019-04-20 11:30:20 -07:00 committed by Matthias J. Sax
parent 964e90a725
commit e56ebbffca
13 changed files with 135 additions and 17 deletions

View File

@ -68,7 +68,7 @@
</div>
<div class="section" id="overriding-default-serdes">
<h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default SerDe settings. For this case, Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>

View File

@ -3547,7 +3547,8 @@ val clicksPerRegion: KTable[String, Long] =
// Write the (continuously updating) results to the output topic.
clicksPerRegion.toStream.to(outputTopic)
</pre>
<p>A complete example of user-defined SerDes can be found in a test class within the library.</p>
<p>A complete example of user-defined SerDes can be found in a test class within the library.
Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
</div>
</div>
</div>

View File

@ -49,6 +49,8 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@ -634,6 +636,26 @@ public class KafkaStreams implements AutoCloseable {
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
}
@SuppressWarnings("unchecked")
private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> sources) {
for (final SinkNode sn : sinks) {
if (sn.getKeySerializer() != null) {
sn.getKeySerializer().configure(config.originals(), true);
}
if (sn.getValueSerializer() != null) {
sn.getValueSerializer().configure(config.originals(), false);
}
}
for (final SourceNode sn : sources) {
if (sn.getKeyDeSerializer() != null) {
sn.getKeyDeSerializer().configure(config.originals(), true);
}
if (sn.getValueDeSerializer() != null) {
sn.getValueDeSerializer().configure(config.originals(), false);
}
}
}
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
final StreamsConfig config,
final KafkaClientSupplier clientSupplier,
@ -670,6 +692,7 @@ public class KafkaStreams implements AutoCloseable {
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
final ProcessorTopology taskTopology = internalTopologyBuilder.build();
configureSerDes(taskTopology.sinks(), taskTopology.sources());
streamsMetadataState = new StreamsMetadataState(
internalTopologyBuilder,
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
@ -683,6 +706,7 @@ public class KafkaStreams implements AutoCloseable {
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
}
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
(globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
@ -696,6 +720,8 @@ public class KafkaStreams implements AutoCloseable {
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
GlobalStreamThread.State globalThreadState = null;
if (globalTaskTopology != null) {
configureSerDes(globalTaskTopology.sinks(), globalTaskTopology.sources());
final String globalThreadId = clientId + "-GlobalStreamThread";
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
config,

View File

@ -44,6 +44,14 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
this.partitioner = partitioner;
}
public Serializer getKeySerializer() {
return keySerializer;
}
public Serializer getValueSerializer() {
return valSerializer;
}
/**
* @throws UnsupportedOperationException if this method adds a child to a sink node
*/

View File

@ -52,6 +52,14 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
this(name, topics, null, keyDeserializer, valDeserializer);
}
public Deserializer getKeyDeSerializer() {
return keyDeserializer;
}
public Deserializer getValueDeSerializer() {
return valDeserializer;
}
K deserializeKey(final String topic, final Headers headers, final byte[] data) {
return keyDeserializer.deserialize(topic, headers, data);
}

View File

@ -115,10 +115,23 @@ public class MeteredKeyValueStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
final Serde<K> usedKeySerde;
final Serde<V> usedValueSerde;
final Map<String, Object> conf = context.appConfigs();
if (keySerde == null) {
usedKeySerde = (Serde<K>) context.keySerde();
} else {
usedKeySerde = keySerde;
usedKeySerde.configure(conf, true);
}
if (valueSerde == null) {
usedValueSerde = (Serde<V>) context.valueSerde();
} else {
usedValueSerde = valueSerde;
usedValueSerde.configure(conf, false);
}
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
}
@SuppressWarnings("unchecked")

View File

@ -69,10 +69,23 @@ public class MeteredSessionStore<K, V>
public void init(final ProcessorContext context,
final StateStore root) {
//noinspection unchecked
final Serde<K> usedKeySerde;
final Serde<V> usedValueSerde;
final Map<String, Object> conf = context.appConfigs();
if (keySerde == null) {
usedKeySerde = (Serde<K>) context.keySerde();
} else {
usedKeySerde = keySerde;
usedKeySerde.configure(conf, true);
}
if (valueSerde == null) {
usedValueSerde = (Serde<V>) context.valueSerde();
} else {
usedValueSerde = valueSerde;
usedValueSerde.configure(conf, false);
}
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();

View File

@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import java.util.Map;
/**
* A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
* inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
@ -48,9 +50,22 @@ public class MeteredTimestampedKeyValueStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
final Serde<K> usedKeySerde;
final Serde<ValueAndTimestamp<V>> usedValueSerde;
final Map<String, Object> conf = context.appConfigs();
if (keySerde == null) {
usedKeySerde = (Serde<K>) context.keySerde();
} else {
usedKeySerde = keySerde;
usedKeySerde.configure(conf, true);
}
if (valueSerde == null) {
usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
} else {
usedValueSerde = valueSerde;
usedValueSerde.configure(conf, false);
}
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
}
}

View File

@ -26,6 +26,8 @@ import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import java.util.Map;
/**
* A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
* inner WindowStore implementation do not need to provide its own metrics collecting functionality.
@ -50,9 +52,22 @@ class MeteredTimestampedWindowStore<K, V>
@SuppressWarnings("unchecked")
@Override
void initStoreSerde(final ProcessorContext context) {
final Serde<K> usedKeySerde;
final Serde<ValueAndTimestamp<V>> usedValueSerde;
final Map<String, Object> conf = context.appConfigs();
if (keySerde == null) {
usedKeySerde = (Serde<K>) context.keySerde();
} else {
usedKeySerde = keySerde;
usedKeySerde.configure(conf, true);
}
if (valueSerde == null) {
usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
} else {
usedValueSerde = valueSerde;
usedValueSerde.configure(conf, false);
}
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
}
}

View File

@ -99,10 +99,23 @@ public class MeteredWindowStore<K, V>
@SuppressWarnings("unchecked")
void initStoreSerde(final ProcessorContext context) {
final Serde<K> usedKeySerde;
final Serde<V> usedValueSerde;
final Map<String, Object> conf = context.appConfigs();
if (keySerde == null) {
usedKeySerde = (Serde<K>) context.keySerde();
} else {
usedKeySerde = keySerde;
usedKeySerde.configure(conf, true);
}
if (valueSerde == null) {
usedValueSerde = (Serde<V>) context.valueSerde();
} else {
usedValueSerde = valueSerde;
usedValueSerde.configure(conf, false);
}
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
}
@SuppressWarnings("unchecked")

View File

@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -92,6 +93,7 @@ public class MeteredKeyValueStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
expect(context.taskId()).andReturn(taskId);
expect(context.appConfigs()).andReturn(new HashMap<>());
expect(inner.name()).andReturn("metered").anyTimes();
}

View File

@ -42,6 +42,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -91,6 +92,7 @@ public class MeteredSessionStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
expect(context.taskId()).andReturn(taskId);
expect(context.appConfigs()).andReturn(new HashMap<>());
expect(inner.name()).andReturn("metered").anyTimes();
}

View File

@ -41,6 +41,7 @@ import org.junit.runner.RunWith;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@ -94,6 +95,7 @@ public class MeteredTimestampedKeyValueStoreTest {
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
expect(context.taskId()).andReturn(taskId);
expect(context.appConfigs()).andReturn(new HashMap<>());
expect(inner.name()).andReturn("metered").anyTimes();
}