mirror of https://github.com/apache/kafka.git
KAFKA-3729: Revert adding Serde auto-config (#6630)
* Revert "MINOR: Add unit test for SerDe auto-configuration (#6610)" This reverts commit172fbb2dd5
. * Revert "[KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461)" This reverts commite56ebbffca
. The two merged PRs introduce a breaking change. Reverting to preserve backward compatibility. Jira ticket reopened. Reviewers: Ted Yu <yuzhihong@gmail.com>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
parent
c5665e6945
commit
56c64803fa
|
@ -68,7 +68,7 @@
|
||||||
</div>
|
</div>
|
||||||
<div class="section" id="overriding-default-serdes">
|
<div class="section" id="overriding-default-serdes">
|
||||||
<h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
|
<h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
|
||||||
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default SerDe settings. For this case, Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
|
<p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
|
||||||
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
|
<div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
|
||||||
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
|
<span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
|
||||||
|
|
||||||
|
|
|
@ -3547,8 +3547,7 @@ val clicksPerRegion: KTable[String, Long] =
|
||||||
// Write the (continuously updating) results to the output topic.
|
// Write the (continuously updating) results to the output topic.
|
||||||
clicksPerRegion.toStream.to(outputTopic)
|
clicksPerRegion.toStream.to(outputTopic)
|
||||||
</pre>
|
</pre>
|
||||||
<p>A complete example of user-defined SerDes can be found in a test class within the library.
|
<p>A complete example of user-defined SerDes can be found in a test class within the library.</p>
|
||||||
Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
|
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -49,8 +49,6 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
|
||||||
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
|
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
|
||||||
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
|
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
|
||||||
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
|
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
|
||||||
import org.apache.kafka.streams.processor.internals.SinkNode;
|
|
||||||
import org.apache.kafka.streams.processor.internals.SourceNode;
|
|
||||||
import org.apache.kafka.streams.processor.internals.StateDirectory;
|
import org.apache.kafka.streams.processor.internals.StateDirectory;
|
||||||
import org.apache.kafka.streams.processor.internals.StreamThread;
|
import org.apache.kafka.streams.processor.internals.StreamThread;
|
||||||
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
|
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
|
||||||
|
@ -636,26 +634,6 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
|
this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> sources) {
|
|
||||||
for (final SinkNode sn : sinks) {
|
|
||||||
if (sn.getKeySerializer() != null) {
|
|
||||||
sn.getKeySerializer().configure(config.originals(), true);
|
|
||||||
}
|
|
||||||
if (sn.getValueSerializer() != null) {
|
|
||||||
sn.getValueSerializer().configure(config.originals(), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for (final SourceNode sn : sources) {
|
|
||||||
if (sn.getKeyDeserializer() != null) {
|
|
||||||
sn.getKeyDeserializer().configure(config.originals(), true);
|
|
||||||
}
|
|
||||||
if (sn.getValueDeserializer() != null) {
|
|
||||||
sn.getValueDeserializer().configure(config.originals(), false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
|
private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
|
||||||
final StreamsConfig config,
|
final StreamsConfig config,
|
||||||
final KafkaClientSupplier clientSupplier,
|
final KafkaClientSupplier clientSupplier,
|
||||||
|
@ -692,7 +670,6 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
|
// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
|
||||||
final ProcessorTopology taskTopology = internalTopologyBuilder.build();
|
final ProcessorTopology taskTopology = internalTopologyBuilder.build();
|
||||||
|
|
||||||
configureSerDes(taskTopology.sinks(), taskTopology.sources());
|
|
||||||
streamsMetadataState = new StreamsMetadataState(
|
streamsMetadataState = new StreamsMetadataState(
|
||||||
internalTopologyBuilder,
|
internalTopologyBuilder,
|
||||||
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
|
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
|
||||||
|
@ -706,7 +683,6 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
|
log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
|
||||||
}
|
}
|
||||||
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
|
final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
|
||||||
|
|
||||||
final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
|
final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
|
||||||
final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
|
final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
|
||||||
(globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
|
(globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
|
||||||
|
@ -720,8 +696,6 @@ public class KafkaStreams implements AutoCloseable {
|
||||||
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
|
final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
|
||||||
GlobalStreamThread.State globalThreadState = null;
|
GlobalStreamThread.State globalThreadState = null;
|
||||||
if (globalTaskTopology != null) {
|
if (globalTaskTopology != null) {
|
||||||
configureSerDes(globalTaskTopology.sinks(), globalTaskTopology.sources());
|
|
||||||
|
|
||||||
final String globalThreadId = clientId + "-GlobalStreamThread";
|
final String globalThreadId = clientId + "-GlobalStreamThread";
|
||||||
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
|
globalStreamThread = new GlobalStreamThread(globalTaskTopology,
|
||||||
config,
|
config,
|
||||||
|
|
|
@ -44,14 +44,6 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
|
||||||
this.partitioner = partitioner;
|
this.partitioner = partitioner;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Serializer getKeySerializer() {
|
|
||||||
return keySerializer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Serializer getValueSerializer() {
|
|
||||||
return valSerializer;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws UnsupportedOperationException if this method adds a child to a sink node
|
* @throws UnsupportedOperationException if this method adds a child to a sink node
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -52,14 +52,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
|
||||||
this(name, topics, null, keyDeserializer, valDeserializer);
|
this(name, topics, null, keyDeserializer, valDeserializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Deserializer getKeyDeserializer() {
|
|
||||||
return keyDeserializer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Deserializer getValueDeserializer() {
|
|
||||||
return valDeserializer;
|
|
||||||
}
|
|
||||||
|
|
||||||
K deserializeKey(final String topic, final Headers headers, final byte[] data) {
|
K deserializeKey(final String topic, final Headers headers, final byte[] data) {
|
||||||
return keyDeserializer.deserialize(topic, headers, data);
|
return keyDeserializer.deserialize(topic, headers, data);
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,23 +115,10 @@ public class MeteredKeyValueStore<K, V>
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
void initStoreSerde(final ProcessorContext context) {
|
void initStoreSerde(final ProcessorContext context) {
|
||||||
final Serde<K> usedKeySerde;
|
|
||||||
final Serde<V> usedValueSerde;
|
|
||||||
final Map<String, Object> conf = context.appConfigs();
|
|
||||||
if (keySerde == null) {
|
|
||||||
usedKeySerde = (Serde<K>) context.keySerde();
|
|
||||||
} else {
|
|
||||||
usedKeySerde = keySerde;
|
|
||||||
usedKeySerde.configure(conf, true);
|
|
||||||
}
|
|
||||||
if (valueSerde == null) {
|
|
||||||
usedValueSerde = (Serde<V>) context.valueSerde();
|
|
||||||
} else {
|
|
||||||
usedValueSerde = valueSerde;
|
|
||||||
usedValueSerde.configure(conf, false);
|
|
||||||
}
|
|
||||||
serdes = new StateSerdes<>(
|
serdes = new StateSerdes<>(
|
||||||
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
|
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
|
||||||
|
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
|
||||||
|
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -69,23 +69,10 @@ public class MeteredSessionStore<K, V>
|
||||||
public void init(final ProcessorContext context,
|
public void init(final ProcessorContext context,
|
||||||
final StateStore root) {
|
final StateStore root) {
|
||||||
//noinspection unchecked
|
//noinspection unchecked
|
||||||
final Serde<K> usedKeySerde;
|
|
||||||
final Serde<V> usedValueSerde;
|
|
||||||
final Map<String, Object> conf = context.appConfigs();
|
|
||||||
if (keySerde == null) {
|
|
||||||
usedKeySerde = (Serde<K>) context.keySerde();
|
|
||||||
} else {
|
|
||||||
usedKeySerde = keySerde;
|
|
||||||
usedKeySerde.configure(conf, true);
|
|
||||||
}
|
|
||||||
if (valueSerde == null) {
|
|
||||||
usedValueSerde = (Serde<V>) context.valueSerde();
|
|
||||||
} else {
|
|
||||||
usedValueSerde = valueSerde;
|
|
||||||
usedValueSerde.configure(conf, false);
|
|
||||||
}
|
|
||||||
serdes = new StateSerdes<>(
|
serdes = new StateSerdes<>(
|
||||||
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
|
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
|
||||||
|
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
|
||||||
|
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
|
||||||
metrics = (StreamsMetricsImpl) context.metrics();
|
metrics = (StreamsMetricsImpl) context.metrics();
|
||||||
|
|
||||||
taskName = context.taskId().toString();
|
taskName = context.taskId().toString();
|
||||||
|
|
|
@ -26,8 +26,6 @@ import org.apache.kafka.streams.state.StateSerdes;
|
||||||
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
|
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
|
||||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
|
* A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
|
||||||
* inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
|
* inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
|
||||||
|
@ -50,22 +48,9 @@ public class MeteredTimestampedKeyValueStore<K, V>
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
void initStoreSerde(final ProcessorContext context) {
|
void initStoreSerde(final ProcessorContext context) {
|
||||||
final Serde<K> usedKeySerde;
|
|
||||||
final Serde<ValueAndTimestamp<V>> usedValueSerde;
|
|
||||||
final Map<String, Object> conf = context.appConfigs();
|
|
||||||
if (keySerde == null) {
|
|
||||||
usedKeySerde = (Serde<K>) context.keySerde();
|
|
||||||
} else {
|
|
||||||
usedKeySerde = keySerde;
|
|
||||||
usedKeySerde.configure(conf, true);
|
|
||||||
}
|
|
||||||
if (valueSerde == null) {
|
|
||||||
usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
|
|
||||||
} else {
|
|
||||||
usedValueSerde = valueSerde;
|
|
||||||
usedValueSerde.configure(conf, false);
|
|
||||||
}
|
|
||||||
serdes = new StateSerdes<>(
|
serdes = new StateSerdes<>(
|
||||||
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
|
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
|
||||||
|
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
|
||||||
|
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -26,8 +26,6 @@ import org.apache.kafka.streams.state.TimestampedWindowStore;
|
||||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
||||||
import org.apache.kafka.streams.state.WindowStore;
|
import org.apache.kafka.streams.state.WindowStore;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
|
* A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
|
||||||
* inner WindowStore implementation do not need to provide its own metrics collecting functionality.
|
* inner WindowStore implementation do not need to provide its own metrics collecting functionality.
|
||||||
|
@ -52,22 +50,9 @@ class MeteredTimestampedWindowStore<K, V>
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
void initStoreSerde(final ProcessorContext context) {
|
void initStoreSerde(final ProcessorContext context) {
|
||||||
final Serde<K> usedKeySerde;
|
|
||||||
final Serde<ValueAndTimestamp<V>> usedValueSerde;
|
|
||||||
final Map<String, Object> conf = context.appConfigs();
|
|
||||||
if (keySerde == null) {
|
|
||||||
usedKeySerde = (Serde<K>) context.keySerde();
|
|
||||||
} else {
|
|
||||||
usedKeySerde = keySerde;
|
|
||||||
usedKeySerde.configure(conf, true);
|
|
||||||
}
|
|
||||||
if (valueSerde == null) {
|
|
||||||
usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
|
|
||||||
} else {
|
|
||||||
usedValueSerde = valueSerde;
|
|
||||||
usedValueSerde.configure(conf, false);
|
|
||||||
}
|
|
||||||
serdes = new StateSerdes<>(
|
serdes = new StateSerdes<>(
|
||||||
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
|
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
|
||||||
|
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
|
||||||
|
valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,23 +99,10 @@ public class MeteredWindowStore<K, V>
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
void initStoreSerde(final ProcessorContext context) {
|
void initStoreSerde(final ProcessorContext context) {
|
||||||
final Serde<K> usedKeySerde;
|
|
||||||
final Serde<V> usedValueSerde;
|
|
||||||
final Map<String, Object> conf = context.appConfigs();
|
|
||||||
if (keySerde == null) {
|
|
||||||
usedKeySerde = (Serde<K>) context.keySerde();
|
|
||||||
} else {
|
|
||||||
usedKeySerde = keySerde;
|
|
||||||
usedKeySerde.configure(conf, true);
|
|
||||||
}
|
|
||||||
if (valueSerde == null) {
|
|
||||||
usedValueSerde = (Serde<V>) context.valueSerde();
|
|
||||||
} else {
|
|
||||||
usedValueSerde = valueSerde;
|
|
||||||
usedValueSerde.configure(conf, false);
|
|
||||||
}
|
|
||||||
serdes = new StateSerdes<>(
|
serdes = new StateSerdes<>(
|
||||||
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
|
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
|
||||||
|
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
|
||||||
|
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -25,10 +25,7 @@ import org.apache.kafka.common.Node;
|
||||||
import org.apache.kafka.common.config.ConfigException;
|
import org.apache.kafka.common.config.ConfigException;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.network.Selectable;
|
import org.apache.kafka.common.network.Selectable;
|
||||||
import org.apache.kafka.common.serialization.Deserializer;
|
|
||||||
import org.apache.kafka.common.serialization.Serde;
|
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.serialization.Serializer;
|
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
import org.apache.kafka.common.serialization.StringSerializer;
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
@ -36,9 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
|
||||||
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
|
||||||
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
|
||||||
import org.apache.kafka.streams.kstream.Consumed;
|
import org.apache.kafka.streams.kstream.Consumed;
|
||||||
import org.apache.kafka.streams.kstream.Grouped;
|
|
||||||
import org.apache.kafka.streams.kstream.Materialized;
|
import org.apache.kafka.streams.kstream.Materialized;
|
||||||
import org.apache.kafka.streams.kstream.Produced;
|
|
||||||
import org.apache.kafka.streams.processor.AbstractProcessor;
|
import org.apache.kafka.streams.processor.AbstractProcessor;
|
||||||
import org.apache.kafka.streams.processor.ThreadMetadata;
|
import org.apache.kafka.streams.processor.ThreadMetadata;
|
||||||
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
|
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
|
||||||
|
@ -79,13 +74,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
import static org.easymock.EasyMock.anyObject;
|
|
||||||
import static org.easymock.EasyMock.eq;
|
|
||||||
import static org.easymock.EasyMock.expect;
|
|
||||||
import static org.easymock.EasyMock.expectLastCall;
|
|
||||||
import static org.easymock.EasyMock.mock;
|
|
||||||
import static org.easymock.EasyMock.replay;
|
|
||||||
import static org.easymock.EasyMock.verify;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
@ -737,131 +725,6 @@ public class KafkaStreamsTest {
|
||||||
startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true);
|
startStreamsAndCheckDirExists(topology, asList(inputTopic, globalTopicName), outputTopic, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
|
||||||
public void shouldInitializeUserSerdes() {
|
|
||||||
final Deserializer mockSourceKeyDeserialzer = mock(Deserializer.class);
|
|
||||||
mockSourceKeyDeserialzer.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
final Deserializer mockSourceValueDeserialzer = mock(Deserializer.class);
|
|
||||||
mockSourceValueDeserialzer.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde mockSourceKeySerde = mock(Serde.class);
|
|
||||||
final Serde mockSourceValueSerde = mock(Serde.class);
|
|
||||||
expect(mockSourceKeySerde.deserializer()).andReturn(mockSourceKeyDeserialzer).anyTimes();
|
|
||||||
expect(mockSourceValueSerde.deserializer()).andReturn(mockSourceValueDeserialzer).anyTimes();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
final Serializer mockThroughKeySerializer = mock(Serializer.class);
|
|
||||||
mockThroughKeySerializer.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
final Serializer mockThroughValueSerializer = mock(Serializer.class);
|
|
||||||
mockThroughValueSerializer.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
final Deserializer mockThroughKeyDeserializer = mock(Deserializer.class);
|
|
||||||
mockThroughKeyDeserializer.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
final Deserializer mockThroughValueDeserializer = mock(Deserializer.class);
|
|
||||||
mockThroughValueDeserializer.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde mockThroughKeySerde = mock(Serde.class);
|
|
||||||
final Serde mockThroughValueSerde = mock(Serde.class);
|
|
||||||
expect(mockThroughKeySerde.serializer()).andReturn(mockThroughKeySerializer).anyTimes();
|
|
||||||
expect(mockThroughValueSerde.serializer()).andReturn(mockThroughValueSerializer).anyTimes();
|
|
||||||
expect(mockThroughKeySerde.deserializer()).andReturn(mockThroughKeyDeserializer).anyTimes();
|
|
||||||
expect(mockThroughValueSerde.deserializer()).andReturn(mockThroughValueDeserializer).anyTimes();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
final Serializer mockGroupedKeySerializer = mock(Serializer.class);
|
|
||||||
mockGroupedKeySerializer.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
final Serializer mockGroupedValueSerializer = mock(Serializer.class);
|
|
||||||
mockGroupedValueSerializer.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
final Deserializer mockGroupedKeyDeserializer = mock(Deserializer.class);
|
|
||||||
mockGroupedKeyDeserializer.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
final Deserializer mockGroupedValueDeserializer = mock(Deserializer.class);
|
|
||||||
mockGroupedValueDeserializer.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde mockGroupedKeySerde = mock(Serde.class);
|
|
||||||
final Serde mockGroupedValueSerde = mock(Serde.class);
|
|
||||||
expect(mockGroupedKeySerde.serializer()).andReturn(mockGroupedKeySerializer).anyTimes();
|
|
||||||
expect(mockGroupedValueSerde.serializer()).andReturn(mockGroupedValueSerializer).anyTimes();
|
|
||||||
expect(mockGroupedKeySerde.deserializer()).andReturn(mockGroupedKeyDeserializer).anyTimes();
|
|
||||||
expect(mockGroupedValueSerde.deserializer()).andReturn(mockGroupedValueDeserializer).anyTimes();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
final Serializer mockOutputKeySerializer = mock(Serializer.class);
|
|
||||||
mockOutputKeySerializer.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
final Serializer mockOutputValueSerializer = mock(Serializer.class);
|
|
||||||
mockOutputValueSerializer.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde mockOutputKeySerde = mock(Serde.class);
|
|
||||||
final Serde mockOutputValueSerde = mock(Serde.class);
|
|
||||||
expect(mockOutputKeySerde.serializer()).andReturn(mockOutputKeySerializer).anyTimes();
|
|
||||||
expect(mockOutputValueSerde.serializer()).andReturn(mockOutputValueSerializer).anyTimes();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
final Deserializer mockGlobalKeyDeserializer = mock(Deserializer.class);
|
|
||||||
mockGlobalKeyDeserializer.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
final Deserializer mockGlobalValueDeserializer = mock(Deserializer.class);
|
|
||||||
mockGlobalValueDeserializer.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde mockGlobalKeySerde = mock(Serde.class);
|
|
||||||
final Serde mockGlobalValueSerde = mock(Serde.class);
|
|
||||||
expect(mockGlobalKeySerde.deserializer()).andReturn(mockGlobalKeyDeserializer).anyTimes();
|
|
||||||
expect(mockGlobalValueSerde.deserializer()).andReturn(mockGlobalValueDeserializer).anyTimes();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
builder
|
|
||||||
.stream("anyTopic", Consumed.with(mockSourceKeySerde, mockSourceValueSerde))
|
|
||||||
.through("anyOtherTopic", Produced.with(mockThroughKeySerde, mockThroughValueSerde))
|
|
||||||
.selectKey(KeyValue::pair)
|
|
||||||
.groupByKey(Grouped.with(mockGroupedKeySerde, mockGroupedValueSerde))
|
|
||||||
.count()
|
|
||||||
.toStream()
|
|
||||||
.to("anyOutput", Produced.with(mockOutputKeySerde, mockOutputValueSerde));
|
|
||||||
builder.globalTable("anyGlobal", Consumed.with(mockGlobalKeySerde, mockGlobalValueSerde));
|
|
||||||
|
|
||||||
replay(
|
|
||||||
mockSourceKeyDeserialzer, mockSourceValueDeserialzer, mockSourceKeySerde, mockSourceValueSerde,
|
|
||||||
mockThroughKeySerializer, mockThroughKeyDeserializer, mockThroughKeySerde,
|
|
||||||
mockThroughValueSerializer, mockThroughValueDeserializer, mockThroughValueSerde,
|
|
||||||
mockGroupedKeySerializer, mockGroupedKeyDeserializer, mockGroupedKeySerde,
|
|
||||||
mockGroupedValueSerializer, mockGroupedValueDeserializer, mockGroupedValueSerde,
|
|
||||||
mockOutputKeySerializer, mockOutputValueSerializer, mockOutputKeySerde, mockOutputValueSerde,
|
|
||||||
mockGlobalKeyDeserializer, mockGlobalValueDeserializer, mockGlobalKeySerde, mockGlobalValueSerde);
|
|
||||||
|
|
||||||
KafkaStreams kafkaStreams = null;
|
|
||||||
try {
|
|
||||||
kafkaStreams = new KafkaStreams(builder.build(), props);
|
|
||||||
} finally {
|
|
||||||
if (kafkaStreams != null) {
|
|
||||||
kafkaStreams.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
verify(
|
|
||||||
mockSourceKeyDeserialzer, mockSourceValueDeserialzer,
|
|
||||||
mockThroughKeySerializer, mockThroughValueSerializer, mockThroughKeyDeserializer, mockThroughValueDeserializer,
|
|
||||||
mockGroupedKeySerializer, mockGroupedValueSerializer, mockGroupedKeyDeserializer, mockGroupedValueDeserializer,
|
|
||||||
mockOutputKeySerializer, mockOutputValueSerializer,
|
|
||||||
mockGlobalKeyDeserializer, mockGlobalValueDeserializer);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private Topology getStatefulTopology(final String inputTopic,
|
private Topology getStatefulTopology(final String inputTopic,
|
||||||
final String outputTopic,
|
final String outputTopic,
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.serialization.Serde;
|
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
|
@ -40,7 +39,6 @@ import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -53,7 +51,6 @@ import static org.easymock.EasyMock.expect;
|
||||||
import static org.easymock.EasyMock.expectLastCall;
|
import static org.easymock.EasyMock.expectLastCall;
|
||||||
import static org.easymock.EasyMock.mock;
|
import static org.easymock.EasyMock.mock;
|
||||||
import static org.easymock.EasyMock.replay;
|
import static org.easymock.EasyMock.replay;
|
||||||
import static org.easymock.EasyMock.reset;
|
|
||||||
import static org.easymock.EasyMock.verify;
|
import static org.easymock.EasyMock.verify;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
@ -95,7 +92,6 @@ public class MeteredKeyValueStoreTest {
|
||||||
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
||||||
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
|
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
|
||||||
expect(context.taskId()).andReturn(taskId);
|
expect(context.taskId()).andReturn(taskId);
|
||||||
expect(context.appConfigs()).andReturn(new HashMap<>());
|
|
||||||
expect(inner.name()).andReturn("metered").anyTimes();
|
expect(inner.name()).andReturn("metered").anyTimes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,53 +100,6 @@ public class MeteredKeyValueStoreTest {
|
||||||
metered.init(context, metered);
|
metered.init(context, metered);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
|
||||||
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
|
|
||||||
metered = new MeteredKeyValueStore<>(
|
|
||||||
inner,
|
|
||||||
"scope",
|
|
||||||
new MockTime(),
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
final Serde mockSerde = mock(Serde.class);
|
|
||||||
replay(mockSerde);
|
|
||||||
expect(context.keySerde()).andReturn(mockSerde);
|
|
||||||
expect(context.valueSerde()).andReturn(mockSerde);
|
|
||||||
|
|
||||||
init();
|
|
||||||
verify(context, mockSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldConfigureUserSerdes() {
|
|
||||||
final Serde<String> mockKeySerde = mock(Serde.class);
|
|
||||||
mockKeySerde.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde<String> mockValueSerde = mock(Serde.class);
|
|
||||||
mockValueSerde.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
replay(mockKeySerde, mockValueSerde);
|
|
||||||
|
|
||||||
metered = new MeteredKeyValueStore<>(
|
|
||||||
inner,
|
|
||||||
"scope",
|
|
||||||
new MockTime(),
|
|
||||||
mockKeySerde,
|
|
||||||
mockValueSerde
|
|
||||||
);
|
|
||||||
|
|
||||||
reset(context);
|
|
||||||
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
|
|
||||||
expect(context.taskId()).andReturn(taskId).anyTimes();
|
|
||||||
|
|
||||||
init();
|
|
||||||
verify(context, mockKeySerde, mockValueSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetrics() {
|
public void testMetrics() {
|
||||||
init();
|
init();
|
||||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.serialization.Serde;
|
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
|
@ -43,7 +42,6 @@ import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
import static org.apache.kafka.common.utils.Utils.mkEntry;
|
||||||
|
@ -55,7 +53,6 @@ import static org.easymock.EasyMock.expect;
|
||||||
import static org.easymock.EasyMock.expectLastCall;
|
import static org.easymock.EasyMock.expectLastCall;
|
||||||
import static org.easymock.EasyMock.mock;
|
import static org.easymock.EasyMock.mock;
|
||||||
import static org.easymock.EasyMock.replay;
|
import static org.easymock.EasyMock.replay;
|
||||||
import static org.easymock.EasyMock.reset;
|
|
||||||
import static org.easymock.EasyMock.verify;
|
import static org.easymock.EasyMock.verify;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
@ -94,7 +91,6 @@ public class MeteredSessionStoreTest {
|
||||||
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
||||||
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
|
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
|
||||||
expect(context.taskId()).andReturn(taskId);
|
expect(context.taskId()).andReturn(taskId);
|
||||||
expect(context.appConfigs()).andReturn(new HashMap<>());
|
|
||||||
expect(inner.name()).andReturn("metered").anyTimes();
|
expect(inner.name()).andReturn("metered").anyTimes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -103,53 +99,6 @@ public class MeteredSessionStoreTest {
|
||||||
metered.init(context, metered);
|
metered.init(context, metered);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
|
||||||
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
|
|
||||||
metered = new MeteredSessionStore<>(
|
|
||||||
inner,
|
|
||||||
"scope",
|
|
||||||
null,
|
|
||||||
null,
|
|
||||||
new MockTime()
|
|
||||||
);
|
|
||||||
final Serde mockSerde = mock(Serde.class);
|
|
||||||
replay(mockSerde);
|
|
||||||
expect(context.keySerde()).andReturn(mockSerde);
|
|
||||||
expect(context.valueSerde()).andReturn(mockSerde);
|
|
||||||
|
|
||||||
init();
|
|
||||||
verify(context, mockSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldConfigureUserSerdes() {
|
|
||||||
final Serde<String> mockKeySerde = mock(Serde.class);
|
|
||||||
mockKeySerde.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde<String> mockValueSerde = mock(Serde.class);
|
|
||||||
mockValueSerde.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
replay(mockKeySerde, mockValueSerde);
|
|
||||||
|
|
||||||
metered = new MeteredSessionStore<>(
|
|
||||||
inner,
|
|
||||||
"scope",
|
|
||||||
mockKeySerde,
|
|
||||||
mockValueSerde,
|
|
||||||
new MockTime()
|
|
||||||
);
|
|
||||||
|
|
||||||
reset(context);
|
|
||||||
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
|
|
||||||
expect(context.taskId()).andReturn(taskId).anyTimes();
|
|
||||||
|
|
||||||
init();
|
|
||||||
verify(context, mockKeySerde, mockValueSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetrics() {
|
public void testMetrics() {
|
||||||
init();
|
init();
|
||||||
|
|
|
@ -19,14 +19,12 @@ package org.apache.kafka.streams.state.internals;
|
||||||
import org.apache.kafka.common.metrics.MetricConfig;
|
import org.apache.kafka.common.metrics.MetricConfig;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.serialization.Serde;
|
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
import org.apache.kafka.streams.state.ValueAndTimestamp;
|
|
||||||
import org.apache.kafka.streams.state.WindowStore;
|
import org.apache.kafka.streams.state.WindowStore;
|
||||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||||
import org.apache.kafka.test.NoOpRecordCollector;
|
import org.apache.kafka.test.NoOpRecordCollector;
|
||||||
|
@ -36,19 +34,13 @@ import org.easymock.EasyMock;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.easymock.EasyMock.anyObject;
|
|
||||||
import static org.easymock.EasyMock.eq;
|
|
||||||
import static org.easymock.EasyMock.expectLastCall;
|
|
||||||
import static org.easymock.EasyMock.mock;
|
|
||||||
import static org.easymock.EasyMock.replay;
|
|
||||||
import static org.easymock.EasyMock.verify;
|
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
public class MeteredTimestampWindowStoreTest {
|
public class MeteredTimestampWindowStoreTest {
|
||||||
private InternalMockProcessorContext context;
|
private InternalMockProcessorContext context;
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
|
private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
|
||||||
private MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
|
private final MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
|
||||||
innerStoreMock,
|
innerStoreMock,
|
||||||
10L, // any size
|
10L, // any size
|
||||||
"scope",
|
"scope",
|
||||||
|
@ -77,73 +69,6 @@ public class MeteredTimestampWindowStoreTest {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
|
|
||||||
store = new MeteredTimestampedWindowStore<>(
|
|
||||||
innerStoreMock,
|
|
||||||
10L, // any size
|
|
||||||
"scope",
|
|
||||||
new MockTime(),
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
|
|
||||||
final Serde mockSerde = mock(Serde.class);
|
|
||||||
|
|
||||||
new InternalMockProcessorContext(
|
|
||||||
TestUtils.tempDirectory(),
|
|
||||||
mockSerde,
|
|
||||||
mockSerde,
|
|
||||||
streamsMetrics,
|
|
||||||
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
|
||||||
NoOpRecordCollector::new,
|
|
||||||
new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
|
|
||||||
);
|
|
||||||
replay(mockSerde, innerStoreMock);
|
|
||||||
|
|
||||||
store.init(context, store);
|
|
||||||
|
|
||||||
verify(mockSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldConfigureUserSerdes() {
|
|
||||||
final Serde<String> mockKeySerde = mock(Serde.class);
|
|
||||||
mockKeySerde.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
|
|
||||||
mockValueSerde.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
store = new MeteredTimestampedWindowStore<>(
|
|
||||||
innerStoreMock,
|
|
||||||
10L, // any size
|
|
||||||
"scope",
|
|
||||||
new MockTime(),
|
|
||||||
mockKeySerde,
|
|
||||||
mockValueSerde
|
|
||||||
);
|
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
|
|
||||||
final Serde mockSerde = mock(Serde.class);
|
|
||||||
|
|
||||||
new InternalMockProcessorContext(
|
|
||||||
TestUtils.tempDirectory(),
|
|
||||||
mockSerde,
|
|
||||||
mockSerde,
|
|
||||||
streamsMetrics,
|
|
||||||
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
|
||||||
NoOpRecordCollector::new,
|
|
||||||
new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
|
|
||||||
);
|
|
||||||
replay(mockKeySerde, mockValueSerde, innerStoreMock);
|
|
||||||
|
|
||||||
store.init(context, store);
|
|
||||||
verify(mockKeySerde, mockValueSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldCloseUnderlyingStore() {
|
public void shouldCloseUnderlyingStore() {
|
||||||
innerStoreMock.close();
|
innerStoreMock.close();
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.KafkaMetric;
|
import org.apache.kafka.common.metrics.KafkaMetric;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.serialization.Serde;
|
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
|
@ -41,7 +40,6 @@ import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -54,7 +52,6 @@ import static org.easymock.EasyMock.expect;
|
||||||
import static org.easymock.EasyMock.expectLastCall;
|
import static org.easymock.EasyMock.expectLastCall;
|
||||||
import static org.easymock.EasyMock.mock;
|
import static org.easymock.EasyMock.mock;
|
||||||
import static org.easymock.EasyMock.replay;
|
import static org.easymock.EasyMock.replay;
|
||||||
import static org.easymock.EasyMock.reset;
|
|
||||||
import static org.easymock.EasyMock.verify;
|
import static org.easymock.EasyMock.verify;
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
@ -78,6 +75,7 @@ public class MeteredTimestampedKeyValueStoreTest {
|
||||||
private MeteredTimestampedKeyValueStore<String, String> metered;
|
private MeteredTimestampedKeyValueStore<String, String> metered;
|
||||||
private final String key = "key";
|
private final String key = "key";
|
||||||
private final Bytes keyBytes = Bytes.wrap(key.getBytes());
|
private final Bytes keyBytes = Bytes.wrap(key.getBytes());
|
||||||
|
private final String value = "value";
|
||||||
private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("value", 97L);
|
private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("value", 97L);
|
||||||
// timestamp is 97 what is ASCII of 'a'
|
// timestamp is 97 what is ASCII of 'a'
|
||||||
private final byte[] valueAndTimestampBytes = "\0\0\0\0\0\0\0avalue".getBytes();
|
private final byte[] valueAndTimestampBytes = "\0\0\0\0\0\0\0avalue".getBytes();
|
||||||
|
@ -96,7 +94,6 @@ public class MeteredTimestampedKeyValueStoreTest {
|
||||||
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
|
||||||
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
|
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
|
||||||
expect(context.taskId()).andReturn(taskId);
|
expect(context.taskId()).andReturn(taskId);
|
||||||
expect(context.appConfigs()).andReturn(new HashMap<>());
|
|
||||||
expect(inner.name()).andReturn("metered").anyTimes();
|
expect(inner.name()).andReturn("metered").anyTimes();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,53 +102,6 @@ public class MeteredTimestampedKeyValueStoreTest {
|
||||||
metered.init(context, metered);
|
metered.init(context, metered);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Test
|
|
||||||
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
|
|
||||||
metered = new MeteredTimestampedKeyValueStore<>(
|
|
||||||
inner,
|
|
||||||
"scope",
|
|
||||||
new MockTime(),
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
final Serde mockSerde = mock(Serde.class);
|
|
||||||
replay(mockSerde);
|
|
||||||
expect(context.keySerde()).andReturn(mockSerde);
|
|
||||||
expect(context.valueSerde()).andReturn(mockSerde);
|
|
||||||
|
|
||||||
init();
|
|
||||||
verify(context, mockSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldConfigureUserSerdes() {
|
|
||||||
final Serde<String> mockKeySerde = mock(Serde.class);
|
|
||||||
mockKeySerde.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
|
|
||||||
mockValueSerde.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
replay(mockKeySerde, mockValueSerde);
|
|
||||||
|
|
||||||
metered = new MeteredTimestampedKeyValueStore<>(
|
|
||||||
inner,
|
|
||||||
"scope",
|
|
||||||
new MockTime(),
|
|
||||||
mockKeySerde,
|
|
||||||
mockValueSerde
|
|
||||||
);
|
|
||||||
|
|
||||||
reset(context);
|
|
||||||
expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
|
|
||||||
expect(context.taskId()).andReturn(taskId).anyTimes();
|
|
||||||
|
|
||||||
init();
|
|
||||||
verify(context, mockKeySerde, mockValueSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetrics() {
|
public void testMetrics() {
|
||||||
init();
|
init();
|
||||||
|
|
|
@ -22,12 +22,12 @@ import org.apache.kafka.common.metrics.JmxReporter;
|
||||||
import org.apache.kafka.common.metrics.MetricConfig;
|
import org.apache.kafka.common.metrics.MetricConfig;
|
||||||
import org.apache.kafka.common.metrics.Metrics;
|
import org.apache.kafka.common.metrics.Metrics;
|
||||||
import org.apache.kafka.common.metrics.Sensor;
|
import org.apache.kafka.common.metrics.Sensor;
|
||||||
import org.apache.kafka.common.serialization.Serde;
|
|
||||||
import org.apache.kafka.common.serialization.Serdes;
|
import org.apache.kafka.common.serialization.Serdes;
|
||||||
import org.apache.kafka.common.utils.Bytes;
|
import org.apache.kafka.common.utils.Bytes;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.common.utils.MockTime;
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.streams.StreamsConfig;
|
import org.apache.kafka.streams.StreamsConfig;
|
||||||
|
import org.apache.kafka.streams.kstream.Windowed;
|
||||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||||
import org.apache.kafka.streams.state.WindowStore;
|
import org.apache.kafka.streams.state.WindowStore;
|
||||||
import org.apache.kafka.test.InternalMockProcessorContext;
|
import org.apache.kafka.test.InternalMockProcessorContext;
|
||||||
|
@ -59,7 +59,7 @@ public class MeteredWindowStoreTest {
|
||||||
private InternalMockProcessorContext context;
|
private InternalMockProcessorContext context;
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private final WindowStore<Bytes, byte[]> innerStoreMock = createNiceMock(WindowStore.class);
|
private final WindowStore<Bytes, byte[]> innerStoreMock = createNiceMock(WindowStore.class);
|
||||||
private MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
|
private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
|
||||||
innerStoreMock,
|
innerStoreMock,
|
||||||
10L, // any size
|
10L, // any size
|
||||||
"scope",
|
"scope",
|
||||||
|
@ -88,73 +88,6 @@ public class MeteredWindowStoreTest {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldGetSerdesFromConfigWithoutUserSerdes() {
|
|
||||||
store = new MeteredWindowStore<>(
|
|
||||||
innerStoreMock,
|
|
||||||
10L, // any size
|
|
||||||
"scope",
|
|
||||||
new MockTime(),
|
|
||||||
null,
|
|
||||||
null
|
|
||||||
);
|
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
|
|
||||||
final Serde mockSerde = mock(Serde.class);
|
|
||||||
|
|
||||||
new InternalMockProcessorContext(
|
|
||||||
TestUtils.tempDirectory(),
|
|
||||||
mockSerde,
|
|
||||||
mockSerde,
|
|
||||||
streamsMetrics,
|
|
||||||
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
|
||||||
NoOpRecordCollector::new,
|
|
||||||
new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
|
|
||||||
);
|
|
||||||
replay(mockSerde, innerStoreMock);
|
|
||||||
|
|
||||||
store.init(context, store);
|
|
||||||
|
|
||||||
verify(mockSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void shouldConfigureUserSerdes() {
|
|
||||||
final Serde<String> mockKeySerde = mock(Serde.class);
|
|
||||||
mockKeySerde.configure(anyObject(), eq(true));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
final Serde<String> mockValueSerde = mock(Serde.class);
|
|
||||||
mockValueSerde.configure(anyObject(), eq(false));
|
|
||||||
expectLastCall();
|
|
||||||
|
|
||||||
store = new MeteredWindowStore<>(
|
|
||||||
innerStoreMock,
|
|
||||||
10L, // any size
|
|
||||||
"scope",
|
|
||||||
new MockTime(),
|
|
||||||
mockKeySerde,
|
|
||||||
mockValueSerde
|
|
||||||
);
|
|
||||||
|
|
||||||
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
|
|
||||||
final Serde mockSerde = mock(Serde.class);
|
|
||||||
|
|
||||||
new InternalMockProcessorContext(
|
|
||||||
TestUtils.tempDirectory(),
|
|
||||||
mockSerde,
|
|
||||||
mockSerde,
|
|
||||||
streamsMetrics,
|
|
||||||
new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
|
|
||||||
NoOpRecordCollector::new,
|
|
||||||
new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
|
|
||||||
);
|
|
||||||
replay(mockKeySerde, mockValueSerde, innerStoreMock);
|
|
||||||
|
|
||||||
store.init(context, store);
|
|
||||||
verify(mockKeySerde, mockValueSerde);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetrics() {
|
public void testMetrics() {
|
||||||
replay(innerStoreMock);
|
replay(innerStoreMock);
|
||||||
|
@ -195,7 +128,7 @@ public class MeteredWindowStoreTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldRecordFetchLatency() {
|
public void shouldRecordFetchLatency() {
|
||||||
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator());
|
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
|
||||||
replay(innerStoreMock);
|
replay(innerStoreMock);
|
||||||
|
|
||||||
store.init(context, store);
|
store.init(context, store);
|
||||||
|
@ -208,7 +141,7 @@ public class MeteredWindowStoreTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldRecordFetchRangeLatency() {
|
public void shouldRecordFetchRangeLatency() {
|
||||||
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyIterator());
|
expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
|
||||||
replay(innerStoreMock);
|
replay(innerStoreMock);
|
||||||
|
|
||||||
store.init(context, store);
|
store.init(context, store);
|
||||||
|
|
Loading…
Reference in New Issue