MINOR: code cleanup (#6053)

Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>, Ryanne Dolan <ryannedolan@gmail.com>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Matthias J. Sax 2019-01-09 18:03:16 +01:00 committed by GitHub
parent 86de2dfd27
commit 1c7bf4e497
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 433 additions and 452 deletions

View File

@ -57,7 +57,7 @@ versions += [
jetty: "9.4.12.v20180830",
jersey: "2.27",
jmh: "1.21",
hamcrest: "1.3",
hamcrest: "2.1",
log4j: "1.2.17",
scalaLogging: "3.9.0",
jaxb: "2.3.0",
@ -119,7 +119,7 @@ libs += [
jmhGeneratorAnnProcess: "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
junit: "junit:junit:$versions.junit",
hamcrest: "org.hamcrest:hamcrest-all:1.3",
hamcrest: "org.hamcrest:hamcrest:$versions.hamcrest",
kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",

View File

@ -113,7 +113,7 @@ public class StreamsBuilder {
* @return a {@link KStream} for the specified topics
*/
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics) {
return stream(topics, Consumed.<K, V>with(null, null, null, null));
return stream(topics, Consumed.with(null, null, null, null));
}
/**
@ -155,7 +155,7 @@ public class StreamsBuilder {
* @return a {@link KStream} for topics matching the regex pattern.
*/
public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern) {
return stream(topicPattern, Consumed.<K, V>with(null, null));
return stream(topicPattern, Consumed.with(null, null));
}
/**
@ -250,7 +250,7 @@ public class StreamsBuilder {
* @return a {@link KTable} for the specified topic
*/
public synchronized <K, V> KTable<K, V> table(final String topic) {
return table(topic, new ConsumedInternal<K, V>());
return table(topic, new ConsumedInternal<>());
}
/**
@ -356,7 +356,7 @@ public class StreamsBuilder {
* @return a {@link GlobalKTable} for the specified topic
*/
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
return globalTable(topic, Consumed.<K, V>with(null, null));
return globalTable(topic, Consumed.with(null, null));
}
/**

View File

@ -115,7 +115,6 @@ public interface KGroupedStream<K, V> {
*/
KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);
/**
* Combine the values of records in this stream by the grouped key.
* Records with {@code null} key or value are ignored.

View File

@ -95,48 +95,35 @@ public abstract class AbstractStream<K, V> {
}
static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final ValueJoiner<T1, T2, R> joiner) {
return new ValueJoiner<T2, T1, R>() {
@Override
public R apply(final T2 value2, final T1 value1) {
return joiner.apply(value1, value2);
}
};
return (value2, value1) -> joiner.apply(value1, value2);
}
static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final ValueMapper<V, VR> valueMapper) {
Objects.requireNonNull(valueMapper, "valueMapper can't be null");
return new ValueMapperWithKey<K, V, VR>() {
@Override
public VR apply(final K readOnlyKey, final V value) {
return valueMapper.apply(value);
}
};
return (readOnlyKey, value) -> valueMapper.apply(value);
}
static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(
final ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
return new ValueTransformerWithKeySupplier<K, V, VR>() {
@Override
public ValueTransformerWithKey<K, V, VR> get() {
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
valueTransformer.init(context);
}
return () -> {
final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
return new ValueTransformerWithKey<K, V, VR>() {
@Override
public void init(final ProcessorContext context) {
valueTransformer.init(context);
}
@Override
public VR transform(final K readOnlyKey, final V value) {
return valueTransformer.transform(value);
}
@Override
public VR transform(final K readOnlyKey, final V value) {
return valueTransformer.transform(value);
}
@Override
public void close() {
valueTransformer.close();
}
};
}
@Override
public void close() {
valueTransformer.close();
}
};
};
}

View File

@ -30,7 +30,9 @@ class ForwardingCacheFlushListener<K, V> implements CacheFlushListener<K, V> {
}
@Override
public void apply(final K key, final V newValue, final V oldValue) {
public void apply(final K key,
final V newValue,
final V oldValue) {
final ProcessorNode prev = context.currentNode();
context.setCurrentNode(myNode);
try {

View File

@ -28,12 +28,7 @@ import org.slf4j.LoggerFactory;
class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
private static final Logger LOG = LoggerFactory.getLogger(KTableKTableInnerJoin.class);
private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
@Override
public K apply(final K key, final V1 value) {
return key;
}
};
private final KeyValueMapper<K, V1, K> keyValueMapper = (key, value) -> key;
KTableKTableInnerJoin(final KTableImpl<K, ?, V1> table1,
final KTableImpl<K, ?, V2> table2,

View File

@ -38,9 +38,10 @@ public class KeyValueStoreMaterializer<K, V> {
final String name = materialized.storeName();
supplier = Stores.persistentKeyValueStore(name);
}
final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(supplier,
materialized.keySerde(),
materialized.valueSerde());
final StoreBuilder<KeyValueStore<K, V>> builder = Stores.keyValueStoreBuilder(
supplier,
materialized.keySerde(),
materialized.valueSerde());
if (materialized.loggingEnabled()) {
builder.withLoggingEnabled(materialized.logConfig());

View File

@ -35,16 +35,16 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
private final Joined<K, V1, V2> joined;
StreamStreamJoinNode(final String nodeName,
final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
final ProcessorParameters<K, V1> joinThisProcessorParameters,
final ProcessorParameters<K, V2> joinOtherProcessParameters,
final ProcessorParameters<K, VR> joinMergeProcessorParameters,
final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters,
final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters,
final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder,
final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder,
final Joined<K, V1, V2> joined) {
private StreamStreamJoinNode(final String nodeName,
final ValueJoiner<? super V1, ? super V2, ? extends VR> valueJoiner,
final ProcessorParameters<K, V1> joinThisProcessorParameters,
final ProcessorParameters<K, V2> joinOtherProcessParameters,
final ProcessorParameters<K, VR> joinMergeProcessorParameters,
final ProcessorParameters<K, V1> thisWindowedStreamProcessorParameters,
final ProcessorParameters<K, V2> otherWindowedStreamProcessorParameters,
final StoreBuilder<WindowStore<K, V1>> thisWindowStoreBuilder,
final StoreBuilder<WindowStore<K, V2>> otherWindowStoreBuilder,
final Joined<K, V1, V2> joined) {
super(nodeName,
valueJoiner,
@ -89,7 +89,7 @@ public class StreamStreamJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K
topologyBuilder.addStateStore(otherWindowStoreBuilder, otherWindowedStreamProcessorName, thisProcessorName);
}
public static <K, V, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
public static <K, V1, V2, VR> StreamStreamJoinNodeBuilder<K, V1, V2, VR> streamStreamJoinNodeBuilder() {
return new StreamStreamJoinNodeBuilder<>();
}

View File

@ -66,7 +66,11 @@ public class TableProcessorNode<K, V, S extends StateStore> extends StreamsGraph
final boolean shouldMaterialize = materializedInternal != null && materializedInternal.queryableStoreName() != null;
if (shouldMaterialize) {
// TODO: we are enforcing this as a keyvalue store, but it should go beyond any type of stores
topologyBuilder.addStateStore(new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal).materialize(), processorName);
topologyBuilder.addStateStore(
new KeyValueStoreMaterializer<>(
(MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal
).materialize(),
processorName);
}
}
}

View File

@ -39,13 +39,13 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
private final String sourceName;
private final boolean isGlobalKTable;
TableSourceNode(final String nodeName,
final String sourceName,
final String topic,
final ConsumedInternal<K, V> consumedInternal,
final MaterializedInternal<K, V, ?> materializedInternal,
final ProcessorParameters<K, V> processorParameters,
final boolean isGlobalKTable) {
private TableSourceNode(final String nodeName,
final String sourceName,
final String topic,
final ConsumedInternal<K, V> consumedInternal,
final MaterializedInternal<K, V, ?> materializedInternal,
final ProcessorParameters<K, V> processorParameters,
final boolean isGlobalKTable) {
super(nodeName,
Collections.singletonList(topic),

View File

@ -50,13 +50,10 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
public class InternalTopologyBuilder {
private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
private static final String[] NO_PREDECESSORS = {};
// node factories in a topological order
@ -706,13 +703,15 @@ public class InternalTopologyBuilder {
// in the map; this scenario is possible, for example, that a state store underlying a source KTable is
// connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic.
if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
if (stateStoreNameToSourceTopics.containsKey(stateStoreName)
|| stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
return;
}
final Set<String> sourceTopics = new HashSet<>();
final Set<Pattern> sourcePatterns = new HashSet<>();
final Set<SourceNodeFactory> sourceNodesForPredecessor = findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
final Set<SourceNodeFactory> sourceNodesForPredecessor =
findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
for (final SourceNodeFactory sourceNodeFactory : sourceNodesForPredecessor) {
if (sourceNodeFactory.pattern != null) {
@ -1019,7 +1018,9 @@ public class InternalTopologyBuilder {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
final String internalTopic = decorateTopic(topic);
repartitionTopics.put(internalTopic, new RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
repartitionTopics.put(
internalTopic,
new RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
sourceTopics.add(internalTopic);
} else {
sourceTopics.add(topic);
@ -1038,14 +1039,16 @@ public class InternalTopologyBuilder {
}
}
// if the node is connected to a state store whose changelog topics are not predefined, add to the changelog topics
// if the node is connected to a state store whose changelog topics are not predefined,
// add to the changelog topics
for (final StateStoreFactory stateFactory : stateFactories.values()) {
if (stateFactory.loggingEnabled() && stateFactory.users().contains(node)) {
final String topicName = storeToChangelogTopic.containsKey(stateFactory.name()) ?
storeToChangelogTopic.get(stateFactory.name()) :
ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.name());
if (!stateChangelogTopics.containsKey(topicName)) {
final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory, topicName);
final InternalTopicConfig internalTopicConfig =
createChangelogTopicConfig(stateFactory, topicName);
stateChangelogTopics.put(topicName, internalTopicConfig);
}
}
@ -1066,7 +1069,8 @@ public class InternalTopologyBuilder {
// Adjust the generated topology based on the configs.
// Not exposed as public API and should be removed post 2.0
private void adjust(final StreamsConfig config) {
final boolean enableOptimization20 = config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
final boolean enableOptimization20 =
config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);
if (enableOptimization20) {
for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet()) {
@ -1084,9 +1088,12 @@ public class InternalTopologyBuilder {
private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
final SourceNodeFactory sourceNode =
(SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey());
//need to update nodeToSourceTopics with topics matched from given regex
nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates()));
nodeToSourceTopics.put(
stringPatternEntry.getKey(),
sourceNode.getTopics(subscriptionUpdates.getUpdates()));
log.debug("nodeToSourceTopics {}", nodeToSourceTopics);
}
}
@ -1108,7 +1115,9 @@ public class InternalTopologyBuilder {
if (storeTopics != null) {
updatedTopicsForStateStore.addAll(storeTopics);
}
stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
stateStoreNameToSourceTopics.put(
storePattern.getKey(),
Collections.unmodifiableSet(updatedTopicsForStateStore));
}
}
}
@ -1205,11 +1214,11 @@ public class InternalTopologyBuilder {
return applicationId + "-" + topic;
}
public SubscriptionUpdates subscriptionUpdates() {
SubscriptionUpdates subscriptionUpdates() {
return subscriptionUpdates;
}
public synchronized Pattern sourceTopicPattern() {
synchronized Pattern sourceTopicPattern() {
if (topicPattern == null) {
final List<String> allSourceTopics = new ArrayList<>();
if (!nodeToSourceTopics.isEmpty()) {
@ -1263,7 +1272,9 @@ public class InternalTopologyBuilder {
return description;
}
private void describeGlobalStore(final TopologyDescription description, final Set<String> nodes, final int id) {
private void describeGlobalStore(final TopologyDescription description,
final Set<String> nodes,
final int id) {
final Iterator<String> it = nodes.iterator();
while (it.hasNext()) {
final String node = it.next();
@ -1314,7 +1325,8 @@ public class InternalTopologyBuilder {
private final static NodeComparator NODE_COMPARATOR = new NodeComparator();
private static void updateSize(final AbstractNode node, final int delta) {
private static void updateSize(final AbstractNode node,
final int delta) {
node.size += delta;
for (final TopologyDescription.Node predecessor : node.predecessors()) {
@ -1523,7 +1535,8 @@ public class InternalTopologyBuilder {
@Override
public String toString() {
return "Processor: " + name + " (stores: " + stores + ")\n --> " + nodeNames(successors) + "\n <-- " + nodeNames(predecessors);
return "Processor: " + name + " (stores: " + stores + ")\n --> "
+ nodeNames(successors) + "\n <-- " + nodeNames(predecessors);
}
@Override
@ -1592,7 +1605,8 @@ public class InternalTopologyBuilder {
if (topicNameExtractor instanceof StaticTopicNameExtractor) {
return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors);
}
return "Sink: " + name + " (extractor class: " + topicNameExtractor + ")\n <-- " + nodeNames(predecessors);
return "Sink: " + name + " (extractor class: " + topicNameExtractor + ")\n <-- "
+ nodeNames(predecessors);
}
@Override
@ -1678,8 +1692,8 @@ public class InternalTopologyBuilder {
}
public static class TopicsInfo {
public final Set<String> sinkTopics;
public final Set<String> sourceTopics;
final Set<String> sinkTopics;
final Set<String> sourceTopics;
public final Map<String, InternalTopicConfig> stateChangelogTopics;
public final Map<String, InternalTopicConfig> repartitionSourceTopics;
@ -1775,10 +1789,8 @@ public class InternalTopologyBuilder {
int globalStoresIndex = sortedGlobalStores.length - 1;
while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
sb.append(" ");
final TopologyDescription.Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
final TopologyDescription.GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
if (subtopology.id() == expectedId) {
sb.append(subtopology);
subtopologiesIndex--;
@ -1789,15 +1801,13 @@ public class InternalTopologyBuilder {
expectedId++;
}
while (subtopologiesIndex != -1) {
final TopologyDescription.Subtopology subtopology =
sortedSubtopologies[subtopologiesIndex];
final TopologyDescription.Subtopology subtopology = sortedSubtopologies[subtopologiesIndex];
sb.append(" ");
sb.append(subtopology);
subtopologiesIndex--;
}
while (globalStoresIndex != -1) {
final TopologyDescription.GlobalStore globalStore =
sortedGlobalStores[globalStoresIndex];
final TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
sb.append(" ");
sb.append(globalStore);
globalStoresIndex--;
@ -1868,7 +1878,8 @@ public class InternalTopologyBuilder {
}
}
public void updateSubscribedTopics(final Set<String> topics, final String logPrefix) {
void updateSubscribedTopics(final Set<String> topics,
final String logPrefix) {
final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
log.debug("{}found {} topics possibly matching regex", logPrefix, topics);
// update the topic groups with the returned subscription set for regex pattern subscriptions

View File

@ -35,10 +35,10 @@ import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.streams.state.internals.WrappedStateStore.AbstractStateStore;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
@ -96,12 +96,12 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
if (!currentNode().stateStores.contains(name)) {
throw new StreamsException("Processor " + currentNode().name() + " has no access to StateStore " + name +
" as the store is not connected to the processor. If you add stores manually via '.addStateStore()' " +
"make sure to connect the added store to the processor by providing the processor name to " +
"'.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. " +
"DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' " +
"to connect the store to the corresponding operator. If you do not add stores manually, " +
"please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
" as the store is not connected to the processor. If you add stores manually via '.addStateStore()' " +
"make sure to connect the added store to the processor by providing the processor name to " +
"'.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. " +
"DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' " +
"to connect the store to the corresponding operator. If you do not add stores manually, " +
"please file a bug report at https://issues.apache.org/jira/projects/KAFKA.");
}
final StateStore store = stateManager.getStore(name);
@ -118,25 +118,35 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(final K key, final V value) {
public <K, V> void forward(final K key,
final V value) {
forward(key, value, SEND_TO_ALL);
}
@SuppressWarnings({"unchecked", "deprecation"})
@Override
public <K, V> void forward(final K key, final V value, final int childIndex) {
forward(key, value, To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
public <K, V> void forward(final K key,
final V value,
final int childIndex) {
forward(
key,
value,
To.child(((List<ProcessorNode>) currentNode().children()).get(childIndex).name()));
}
@SuppressWarnings({"unchecked", "deprecation"})
@Override
public <K, V> void forward(final K key, final V value, final String childName) {
public <K, V> void forward(final K key,
final V value,
final String childName) {
forward(key, value, To.child(childName));
}
@SuppressWarnings("unchecked")
@Override
public <K, V> void forward(final K key, final V value, final To to) {
public <K, V> void forward(final K key,
final V value,
final To to) {
toInternal.update(to);
if (toInternal.hasTimestamp()) {
recordContext.setTimestamp(toInternal.timestamp());
@ -148,8 +158,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
if (sendTo != null) {
final ProcessorNode child = currentNode().getChild(sendTo);
if (child == null) {
throw new StreamsException("Unknown downstream node: " + sendTo + " either does not exist or is not" +
" connected to this processor.");
throw new StreamsException("Unknown downstream node: " + sendTo
+ " either does not exist or is not connected to this processor.");
}
forward(child, key, value);
} else {
@ -182,7 +192,9 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Override
@Deprecated
public Cancellable schedule(final long interval, final PunctuationType type, final Punctuator callback) {
public Cancellable schedule(final long interval,
final PunctuationType type,
final Punctuator callback) {
if (interval < 1) {
throw new IllegalArgumentException("The minimum supported scheduling interval is 1 millisecond.");
}
@ -210,7 +222,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
private abstract static class StateStoreReadOnlyDecorator<T extends StateStore> extends AbstractStateStore {
static final String ERROR_MESSAGE = "Global store is read only";
StateStoreReadOnlyDecorator(final T inner) {
private StateStoreReadOnlyDecorator(final T inner) {
super(inner);
}
@ -225,7 +237,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
public void init(final ProcessorContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@ -235,8 +248,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
private static class KeyValueStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
private static class KeyValueStoreReadOnlyDecorator<K, V>
extends StateStoreReadOnlyDecorator<KeyValueStore<K, V>>
implements KeyValueStore<K, V> {
private KeyValueStoreReadOnlyDecorator(final KeyValueStore<K, V> inner) {
super(inner);
}
@ -246,7 +262,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
public KeyValueIterator<K, V> range(final K from,
final K to) {
return getInner().range(from, to);
}
@ -261,12 +278,14 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public void put(final K key, final V value) {
public void put(final K key,
final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public V putIfAbsent(final K key, final V value) {
public V putIfAbsent(final K key,
final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@ -281,35 +300,47 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
private static class WindowStoreReadOnlyDecorator<K, V> extends StateStoreReadOnlyDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
private static class WindowStoreReadOnlyDecorator<K, V>
extends StateStoreReadOnlyDecorator<WindowStore<K, V>>
implements WindowStore<K, V> {
private WindowStoreReadOnlyDecorator(final WindowStore<K, V> inner) {
super(inner);
}
@Override
public void put(final K key, final V value) {
public void put(final K key,
final V value) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public void put(final K key, final V value, final long windowStartTimestamp) {
public void put(final K key,
final V value,
final long windowStartTimestamp) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public V fetch(final K key, final long time) {
public V fetch(final K key,
final long time) {
return getInner().fetch(key, time);
}
@Deprecated
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
final long timeTo) {
return getInner().fetch(key, timeFrom, timeTo);
}
@Deprecated
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
final long timeFrom,
final long timeTo) {
return getInner().fetch(from, to, timeFrom, timeTo);
}
@ -320,23 +351,32 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Deprecated
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return getInner().fetchAll(timeFrom, timeTo);
}
}
private static class SessionStoreReadOnlyDecorator<K, AGG> extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
private static class SessionStoreReadOnlyDecorator<K, AGG>
extends StateStoreReadOnlyDecorator<SessionStore<K, AGG>>
implements SessionStore<K, AGG> {
private SessionStoreReadOnlyDecorator(final SessionStore<K, AGG> inner) {
super(inner);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
return getInner().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
return getInner().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
}
@ -346,7 +386,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public void put(final Windowed<K> sessionKey, final AGG aggregate) {
public void put(final Windowed<K> sessionKey,
final AGG aggregate) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@ -356,7 +397,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
final K to) {
return getInner().fetch(from, to);
}
}
@ -364,7 +406,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
private abstract static class StateStoreReadWriteDecorator<T extends StateStore> extends AbstractStateStore {
static final String ERROR_MESSAGE = "This method may only be called by Kafka Streams";
StateStoreReadWriteDecorator(final T inner) {
private StateStoreReadWriteDecorator(final T inner) {
super(inner);
}
@ -374,7 +416,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
public void init(final ProcessorContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@ -384,8 +427,11 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
private static class KeyValueStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<KeyValueStore<K, V>> implements KeyValueStore<K, V> {
KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
private static class KeyValueStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<KeyValueStore<K, V>>
implements KeyValueStore<K, V> {
private KeyValueStoreReadWriteDecorator(final KeyValueStore<K, V> inner) {
super(inner);
}
@ -395,7 +441,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
public KeyValueIterator<K, V> range(final K from,
final K to) {
return wrapped().range(from, to);
}
@ -410,12 +457,14 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public void put(final K key, final V value) {
public void put(final K key,
final V value) {
wrapped().put(key, value);
}
@Override
public V putIfAbsent(final K key, final V value) {
public V putIfAbsent(final K key,
final V value) {
return wrapped().putIfAbsent(key, value);
}
@ -430,35 +479,47 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
}
private static class WindowStoreReadWriteDecorator<K, V> extends StateStoreReadWriteDecorator<WindowStore<K, V>> implements WindowStore<K, V> {
WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
private static class WindowStoreReadWriteDecorator<K, V>
extends StateStoreReadWriteDecorator<WindowStore<K, V>>
implements WindowStore<K, V> {
private WindowStoreReadWriteDecorator(final WindowStore<K, V> inner) {
super(inner);
}
@Override
public void put(final K key, final V value) {
public void put(final K key,
final V value) {
wrapped().put(key, value);
}
@Override
public void put(final K key, final V value, final long windowStartTimestamp) {
public void put(final K key,
final V value,
final long windowStartTimestamp) {
wrapped().put(key, value, windowStartTimestamp);
}
@Override
public V fetch(final K key, final long time) {
public V fetch(final K key,
final long time) {
return wrapped().fetch(key, time);
}
@Deprecated
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(key, timeFrom, timeTo);
}
@Deprecated
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
final long timeFrom,
final long timeTo) {
return wrapped().fetch(from, to, timeFrom, timeTo);
}
@ -469,23 +530,32 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
@Deprecated
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return wrapped().fetchAll(timeFrom, timeTo);
}
}
private static class SessionStoreReadWriteDecorator<K, AGG> extends StateStoreReadWriteDecorator<SessionStore<K, AGG>> implements SessionStore<K, AGG> {
SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
private static class SessionStoreReadWriteDecorator<K, AGG>
extends StateStoreReadWriteDecorator<SessionStore<K, AGG>>
implements SessionStore<K, AGG> {
private SessionStoreReadWriteDecorator(final SessionStore<K, AGG> inner) {
super(inner);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key, final long earliestSessionEndTime, final long latestSessionStartTime) {
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
return wrapped().findSessions(key, earliestSessionEndTime, latestSessionStartTime);
}
@Override
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom, final K keyTo, final long earliestSessionEndTime, final long latestSessionStartTime) {
public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
final K keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
return wrapped().findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
}
@ -505,7 +575,8 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
}
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from, final K to) {
public KeyValueIterator<Windowed<K>, AGG> fetch(final K from,
final K to) {
return wrapped().fetch(from, to);
}
}

View File

@ -96,7 +96,8 @@ public class ProcessorStateManager extends AbstractStateManager {
}
public static String storeChangelogTopic(final String applicationId, final String storeName) {
public static String storeChangelogTopic(final String applicationId,
final String storeName) {
return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
}
@ -133,12 +134,13 @@ public class ProcessorStateManager extends AbstractStateManager {
restoreCallbacks.put(topic, stateRestoreCallback);
} else {
log.trace("Restoring state store {} from changelog topic {}", storeName, topic);
final StateRestorer restorer = new StateRestorer(storePartition,
new CompositeRestoreListener(stateRestoreCallback),
checkpointableOffsets.get(storePartition),
offsetLimit(storePartition),
store.persistent(),
storeName);
final StateRestorer restorer = new StateRestorer(
storePartition,
new CompositeRestoreListener(stateRestoreCallback),
checkpointableOffsets.get(storePartition),
offsetLimit(storePartition),
store.persistent(),
storeName);
changelogReader.register(restorer);
}
@ -190,7 +192,8 @@ public class ProcessorStateManager extends AbstractStateManager {
standbyRestoredOffsets.put(storePartition, lastOffset + 1);
}
void putOffsetLimit(final TopicPartition partition, final long limit) {
void putOffsetLimit(final TopicPartition partition,
final long limit) {
log.trace("Updating store offset limit for partition {} to {}", partition, limit);
offsetLimits.put(partition, limit);
}

View File

@ -22,7 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
@ -171,7 +171,7 @@ public class SubscriptionInfo {
if (userEndPoint == null) {
return new byte[0];
} else {
return userEndPoint.getBytes(Charset.forName("UTF-8"));
return userEndPoint.getBytes(StandardCharsets.UTF_8);
}
}
@ -318,7 +318,7 @@ public class SubscriptionInfo {
if (bytesLength != 0) {
final byte[] bytes = new byte[bytesLength];
data.get(bytes);
subscriptionInfo.userEndPoint = new String(bytes, Charset.forName("UTF-8"));
subscriptionInfo.userEndPoint = new String(bytes, StandardCharsets.UTF_8);
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStore;
@ -23,37 +25,40 @@ import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
/**
* Provides access to the {@link QueryableStoreType}s provided with KafkaStreams. These
* can be used with {@link org.apache.kafka.streams.KafkaStreams#store(String, QueryableStoreType)}
* To access and query the {@link StateStore}s that are part of a Topology
* Provides access to the {@link QueryableStoreType}s provided with {@link KafkaStreams}.
* These can be used with {@link KafkaStreams#store(String, QueryableStoreType)}.
* To access and query the {@link StateStore}s that are part of a {@link Topology}.
*/
public class QueryableStoreTypes {
public final class QueryableStoreTypes {
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.KeyValueStoreType}
* A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore}.
*
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.KeyValueStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, V>> keyValueStore() {
return new KeyValueStoreType<>();
}
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.WindowStoreType}
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}.
*
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.WindowStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, V>> windowStore() {
return new WindowStoreType<>();
}
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.SessionStoreType}
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}.
*
* @param <K> key type of the store
* @param <V> value type of the store
* @return {@link QueryableStoreTypes.SessionStoreType}
*/
public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, V>> sessionStore() {
return new SessionStoreType<>();
@ -104,7 +109,8 @@ public class QueryableStoreTypes {
super(ReadOnlySessionStore.class);
}
@Override
public ReadOnlySessionStore<K, V> create(final StateStoreProvider storeProvider, final String storeName) {
public ReadOnlySessionStore<K, V> create(final StateStoreProvider storeProvider,
final String storeName) {
return new CompositeReadOnlySessionStore<>(storeProvider, this, storeName);
}
}

View File

@ -43,16 +43,16 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @param key The key to associate the value to
* @param value The value to update, it can be null;
* if the serialized bytes are also null it is interpreted as deletes
* @throws NullPointerException If null is used for key.
* @throws NullPointerException if the given key is {@code null}
*/
void put(K key, V value);
/**
* Put a key-value pair with the given timestamp into the corresponding window
* Put a key-value pair into the window with given window start timestamp
* @param key The key to associate the value to
* @param value The value; can be null
* @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into
* @throws NullPointerException If null is used for key.
* @throws NullPointerException if the given key is {@code null}
*/
void put(K key, V value, long windowStartTimestamp);
@ -87,7 +87,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @param timeTo time range end (inclusive)
* @return an iterator over key-value pairs {@code <timestamp, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for key.
* @throws NullPointerException if the given key is {@code null}
*/
@SuppressWarnings("deprecation")
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
@ -111,7 +111,7 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @param timeTo time range end (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException If {@code null} is used for any key.
* @throws NullPointerException if one of the given keys is {@code null}
*/
@SuppressWarnings("deprecation")
KeyValueIterator<Windowed<K>, V> fetch(K from, K to, long timeFrom, long timeTo);
@ -132,7 +132,6 @@ public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V>
* @param timeTo the end of the time slot from which to search (inclusive)
* @return an iterator over windowed key-value pairs {@code <Windowed<K>, value>}
* @throws InvalidStateStoreException if the store is not initialized
* @throws NullPointerException if {@code null} is used for any key
*/
@SuppressWarnings("deprecation")
KeyValueIterator<Windowed<K>, V> fetchAll(long timeFrom, long timeTo);

View File

@ -59,7 +59,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
public void init(final ProcessorContext context,
final StateStore root) {
initInternal(context);
underlying.init(context, root);
// save the stream thread as we only ever want to trigger a flush
@ -76,17 +77,15 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
this.cache = this.context.getCache();
this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name());
cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> entries) {
for (final ThreadCache.DirtyEntry entry : entries) {
putAndMaybeForward(entry, (InternalProcessorContext) context);
}
cache.addDirtyEntryFlushListener(cacheName, entries -> {
for (final ThreadCache.DirtyEntry entry : entries) {
putAndMaybeForward(entry, (InternalProcessorContext) context);
}
});
}
private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) {
private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
final InternalProcessorContext context) {
final ProcessorRecordContext current = context.recordContext();
try {
context.setRecordContext(entry.entry().context());
@ -190,7 +189,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
validateStoreOpen();
final KeyValueIterator<Bytes, byte[]> storeIterator = underlying.range(from, to);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, from, to);
@ -217,7 +217,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
@Override
public void put(final Bytes key, final byte[] value) {
public void put(final Bytes key,
final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
lock.writeLock().lock();
@ -229,7 +230,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
}
private void putInternal(final Bytes key, final byte[] value) {
private void putInternal(final Bytes key,
final byte[] value) {
cache.put(
cacheName,
key,
@ -244,7 +246,8 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
}
@Override
public byte[] putIfAbsent(final Bytes key, final byte[] value) {
public byte[] putIfAbsent(final Bytes key,
final byte[] value) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
lock.writeLock().lock();

View File

@ -38,19 +38,17 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
}
@Override
public void init(final ProcessorContext context, final StateStore root) {
public void init(final ProcessorContext context,
final StateStore root) {
inner.init(context, root);
final String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name());
this.changeLogger = new StoreChangeLogger<>(inner.name(), context, new StateSerdes<>(topic, Serdes.Bytes(), Serdes.ByteArray()));
// if the inner store is an LRU cache, add the eviction listener to log removed record
if (inner instanceof MemoryLRUCache) {
((MemoryLRUCache<Bytes, byte[]>) inner).whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<Bytes, byte[]>() {
@Override
public void apply(final Bytes key, final byte[] value) {
// pass null to indicate removal
changeLogger.logChange(key, null);
}
((MemoryLRUCache<Bytes, byte[]>) inner).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal
changeLogger.logChange(key, null);
});
}
}
@ -61,13 +59,15 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
}
@Override
public void put(final Bytes key, final byte[] value) {
public void put(final Bytes key,
final byte[] value) {
inner.put(key, value);
changeLogger.logChange(key, value);
}
@Override
public byte[] putIfAbsent(final Bytes key, final byte[] value) {
public byte[] putIfAbsent(final Bytes key,
final byte[] value) {
final byte[] previous = get(key);
if (previous == null) {
put(key, value);
@ -96,7 +96,8 @@ public class ChangeLoggingKeyValueBytesStore extends WrappedStateStore.AbstractS
}
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
return inner.range(from, to);
}

View File

@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import java.util.List;
public class InMemoryKeyValueLoggedStore<K, V> extends WrappedStateStore.AbstractStateStore implements KeyValueStore<K, V> {
private final KeyValueStore<K, V> inner;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private StoreChangeLogger<K, V> changeLogger;
public InMemoryKeyValueLoggedStore(final KeyValueStore<K, V> inner, final Serde<K> keySerde, final Serde<V> valueSerde) {
super(inner);
this.inner = inner;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
inner.init(context, root);
// construct the serde
final StateSerdes<K, V> serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), inner.name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
this.changeLogger = new StoreChangeLogger<>(inner.name(), context, serdes);
// if the inner store is an LRU cache, add the eviction listener to log removed record
if (inner instanceof MemoryLRUCache) {
((MemoryLRUCache<K, V>) inner).whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
@Override
public void apply(final K key, final V value) {
removed(key);
}
});
}
}
@Override
public long approximateNumEntries() {
return inner.approximateNumEntries();
}
@Override
public V get(final K key) {
return this.inner.get(key);
}
@Override
public void put(final K key, final V value) {
this.inner.put(key, value);
changeLogger.logChange(key, value);
}
@Override
public V putIfAbsent(final K key, final V value) {
final V originalValue = this.inner.putIfAbsent(key, value);
if (originalValue == null) {
changeLogger.logChange(key, value);
}
return originalValue;
}
@Override
public void putAll(final List<KeyValue<K, V>> entries) {
this.inner.putAll(entries);
for (final KeyValue<K, V> entry : entries) {
final K key = entry.key;
changeLogger.logChange(key, entry.value);
}
}
@Override
public V delete(final K key) {
final V value = this.inner.delete(key);
removed(key);
return value;
}
/**
* Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
* store.
*
* @param key the key for the entry that the inner store removed
*/
protected void removed(final K key) {
changeLogger.logChange(key, null);
}
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
return this.inner.range(from, to);
}
@Override
public KeyValueIterator<K, V> all() {
return this.inner.all();
}
}

View File

@ -52,10 +52,6 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.map = new TreeMap<>();
}
public KeyValueStore<K, V> enableLogging() {
return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
}
@Override
public String name() {
return this.name;

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Objects;
public class KeyValueStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, KeyValueStore<K, V>> {

View File

@ -43,18 +43,17 @@ import java.util.Objects;
public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
public interface EldestEntryRemovalListener<K, V> {
void apply(K key, V value);
}
private final Serde<K> keySerde;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final String name;
protected final Map<K, V> map;
private StateSerdes<K, V> serdes;
private boolean restoring = false; // TODO: this is a sub-optimal solution to avoid logging during restoration.
// in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this.
private boolean restoring = false; // TODO: this is a sub-optimal solution to avoid logging during restoration.
// in the future we should augment the StateRestoreCallback with onComplete etc to better resolve this.
private volatile boolean open = true;
private EldestEntryRemovalListener<K, V> listener;
@ -82,14 +81,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
};
}
KeyValueStore<K, V> enableLogging() {
return new InMemoryKeyValueLoggedStore<>(this, keySerde, valueSerde);
}
MemoryLRUCache<K, V> whenEldestRemoved(final EldestEntryRemovalListener<K, V> listener) {
void setWhenEldestRemoved(final EldestEntryRemovalListener<K, V> listener) {
this.listener = listener;
return this;
}
@Override
@ -99,7 +92,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context, final StateStore root) {
public void init(final ProcessorContext context,
final StateStore root) {
// construct the serde
this.serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
@ -137,7 +131,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
@Override
public synchronized void put(final K key, final V value) {
public synchronized void put(final K key,
final V value) {
Objects.requireNonNull(key);
if (value == null) {
this.map.remove(key);
@ -147,7 +142,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
@Override
public synchronized V putIfAbsent(final K key, final V value) {
public synchronized V putIfAbsent(final K key,
final V value) {
Objects.requireNonNull(key);
final V originalValue = get(key);
if (originalValue == null) {
@ -173,7 +169,8 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
* @throws UnsupportedOperationException at every invocation
*/
@Override
public KeyValueIterator<K, V> range(final K from, final K to) {
public KeyValueIterator<K, V> range(final K from,
final K to) {
throw new UnsupportedOperationException("MemoryLRUCache does not support range() function.");
}

View File

@ -82,14 +82,14 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
@Override
public void init(final ProcessorContext context,
final StateStore root) {
this.metrics = (StreamsMetricsImpl) context.metrics();
metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics";
final Map<String, String> taskTags = metrics.tagMap("task-id", taskName, metricScope + "-id", "all");
final Map<String, String> storeTags = metrics.tagMap("task-id", taskName, metricScope + "-id", name());
this.serdes = new StateSerdes<>(
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
@ -132,9 +132,9 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
public V get(final K key) {
try {
if (getTime.shouldRecord()) {
return measureLatency(() -> outerValue(inner.get(Bytes.wrap(serdes.rawKey(key)))), getTime);
return measureLatency(() -> outerValue(inner.get(keyBytes(key))), getTime);
} else {
return outerValue(inner.get(Bytes.wrap(serdes.rawKey(key))));
return outerValue(inner.get(keyBytes(key)));
}
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
@ -148,11 +148,11 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
try {
if (putTime.shouldRecord()) {
measureLatency(() -> {
inner.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value));
inner.put(keyBytes(key), serdes.rawValue(value));
return null;
}, putTime);
} else {
inner.put(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value));
inner.put(keyBytes(key), serdes.rawValue(value));
}
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
@ -165,10 +165,10 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
final V value) {
if (putIfAbsentTime.shouldRecord()) {
return measureLatency(
() -> outerValue(inner.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value))),
() -> outerValue(inner.putIfAbsent(keyBytes(key), serdes.rawValue(value))),
putIfAbsentTime);
} else {
return outerValue(inner.putIfAbsent(Bytes.wrap(serdes.rawKey(key)), serdes.rawValue(value)));
return outerValue(inner.putIfAbsent(keyBytes(key), serdes.rawValue(value)));
}
}
@ -190,9 +190,9 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
public V delete(final K key) {
try {
if (deleteTime.shouldRecord()) {
return measureLatency(() -> outerValue(inner.delete(Bytes.wrap(serdes.rawKey(key)))), deleteTime);
return measureLatency(() -> outerValue(inner.delete(keyBytes(key))), deleteTime);
} else {
return outerValue(inner.delete(Bytes.wrap(serdes.rawKey(key))));
return outerValue(inner.delete(keyBytes(key)));
}
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key);
@ -204,13 +204,13 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
public KeyValueIterator<K, V> range(final K from,
final K to) {
return new MeteredKeyValueIterator(
this.inner.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))),
this.rangeTime);
inner.range(Bytes.wrap(serdes.rawKey(from)), Bytes.wrap(serdes.rawKey(to))),
rangeTime);
}
@Override
public KeyValueIterator<K, V> all() {
return new MeteredKeyValueIterator(this.inner.all(), this.allTime);
return new MeteredKeyValueIterator(inner.all(), allTime);
}
@Override
@ -245,6 +245,10 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
return value == null ? null : serdes.valueFrom(value);
}
private Bytes keyBytes(final K key) {
return Bytes.wrap(serdes.rawKey(key));
}
private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K, V>> from) {
final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
for (final KeyValue<K, V> entry : from) {
@ -289,7 +293,7 @@ public class MeteredKeyValueStore<K, V> extends WrappedStateStore.AbstractStateS
try {
iter.close();
} finally {
metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
metrics.recordLatency(sensor, startNs, time.nanoseconds());
}
}

View File

@ -65,12 +65,14 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context, final StateStore root) {
public void init(final ProcessorContext context,
final StateStore root) {
//noinspection unchecked
this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
this.metrics = (StreamsMetricsImpl) context.metrics();
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics";
@ -88,7 +90,7 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
try {
inner.init(context, root);
} finally {
this.metrics.recordLatency(
metrics.recordLatency(
restoreTime,
startNs,
time.nanoseconds()
@ -109,13 +111,15 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
final long latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
final Bytes bytesKey = keyBytes(key);
return new MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKey,
earliestSessionEndTime,
latestSessionStartTime),
fetchTime,
metrics,
serdes,
time);
return new MeteredWindowedKeyValueIterator<>(
inner.findSessions(
bytesKey,
earliestSessionEndTime,
latestSessionStartTime),
fetchTime,
metrics,
serdes,
time);
}
@Override
@ -127,14 +131,16 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
Objects.requireNonNull(keyTo, "keyTo cannot be null");
final Bytes bytesKeyFrom = keyBytes(keyFrom);
final Bytes bytesKeyTo = keyBytes(keyTo);
return new MeteredWindowedKeyValueIterator<>(inner.findSessions(bytesKeyFrom,
bytesKeyTo,
earliestSessionEndTime,
latestSessionStartTime),
fetchTime,
metrics,
serdes,
time);
return new MeteredWindowedKeyValueIterator<>(
inner.findSessions(
bytesKeyFrom,
bytesKeyTo,
earliestSessionEndTime,
latestSessionStartTime),
fetchTime,
metrics,
serdes,
time);
}
@Override
@ -148,22 +154,23 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
final String message = String.format(e.getMessage(), sessionKey.key());
throw new ProcessorStateException(message, e);
} finally {
this.metrics.recordLatency(removeTime, startNs, time.nanoseconds());
metrics.recordLatency(removeTime, startNs, time.nanoseconds());
}
}
@Override
public void put(final Windowed<K> sessionKey, final V aggregate) {
public void put(final Windowed<K> sessionKey,
final V aggregate) {
Objects.requireNonNull(sessionKey, "sessionKey can't be null");
final long startNs = time.nanoseconds();
try {
final Bytes key = keyBytes(sessionKey.key());
this.inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
inner.put(new Windowed<>(key, sessionKey.window()), serdes.rawValue(aggregate));
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), sessionKey.key(), aggregate);
throw new ProcessorStateException(message, e);
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
metrics.recordLatency(putTime, startNs, time.nanoseconds());
}
}
@ -178,7 +185,8 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
}
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to) {
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to) {
Objects.requireNonNull(from, "from cannot be null");
Objects.requireNonNull(to, "to cannot be null");
return findSessions(from, to, 0, Long.MAX_VALUE);
@ -188,9 +196,9 @@ public class MeteredSessionStore<K, V> extends WrappedStateStore.AbstractStateSt
public void flush() {
final long startNs = time.nanoseconds();
try {
this.inner.flush();
inner.flush();
} finally {
this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
metrics.recordLatency(flushTime, startNs, time.nanoseconds());
}
}
}

View File

@ -66,12 +66,14 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context, final StateStore root) {
public void init(final ProcessorContext context,
final StateStore root) {
this.context = context;
this.serdes = new StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
this.metrics = (StreamsMetricsImpl) context.metrics();
serdes = new StateSerdes<>(
ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
metrics = (StreamsMetricsImpl) context.metrics();
taskName = context.taskId().toString();
final String metricsGroup = "stream-" + metricScope + "-metrics";
@ -88,7 +90,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
try {
inner.init(context, root);
} finally {
this.metrics.recordLatency(
metrics.recordLatency(
restoreTime,
startNs,
time.nanoseconds()
@ -103,12 +105,15 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
public void put(final K key, final V value) {
public void put(final K key,
final V value) {
put(key, value, context.timestamp());
}
@Override
public void put(final K key, final V value, final long windowStartTimestamp) {
public void put(final K key,
final V value,
final long windowStartTimestamp) {
final long startNs = time.nanoseconds();
try {
inner.put(keyBytes(key), serdes.rawValue(value), windowStartTimestamp);
@ -116,7 +121,7 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
final String message = String.format(e.getMessage(), key, value);
throw new ProcessorStateException(message, e);
} finally {
metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
metrics.recordLatency(putTime, startNs, time.nanoseconds());
}
}
@ -125,7 +130,8 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
@Override
public V fetch(final K key, final long timestamp) {
public V fetch(final K key,
final long timestamp) {
final long startNs = time.nanoseconds();
try {
final byte[] result = inner.fetch(keyBytes(key), timestamp);
@ -134,13 +140,15 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
}
return serdes.valueFrom(result);
} finally {
metrics.recordLatency(this.fetchTime, startNs, time.nanoseconds());
metrics.recordLatency(fetchTime, startNs, time.nanoseconds());
}
}
@SuppressWarnings("deprecation")
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
final long timeTo) {
return new MeteredWindowStoreIterator<>(inner.fetch(keyBytes(key), timeFrom, timeTo),
fetchTime,
metrics,
@ -155,22 +163,28 @@ public class MeteredWindowStore<K, V> extends WrappedStateStore.AbstractStateSto
@SuppressWarnings("deprecation")
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(inner.fetchAll(timeFrom, timeTo),
fetchTime,
metrics,
serdes,
time);
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
inner.fetchAll(timeFrom, timeTo),
fetchTime,
metrics,
serdes,
time);
}
@SuppressWarnings("deprecation")
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
fetchTime,
metrics,
serdes,
time);
public KeyValueIterator<Windowed<K>, V> fetch(final K from,
final K to,
final long timeFrom,
final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
inner.fetch(keyBytes(from), keyBytes(to), timeFrom, timeTo),
fetchTime,
metrics,
serdes,
time);
}
@Override

View File

@ -21,9 +21,10 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static java.util.Collections.singletonList;
/**
* A wrapper over all of the {@link StateStoreProvider}s in a Topology
*/
@ -47,10 +48,11 @@ public class QueryableStoreProvider {
* @param <T> The expected type of the returned store
* @return A composite object that wraps the store instances.
*/
public <T> T getStore(final String storeName, final QueryableStoreType<T> queryableStoreType) {
public <T> T getStore(final String storeName,
final QueryableStoreType<T> queryableStoreType) {
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
if (!globalStore.isEmpty()) {
return queryableStoreType.create(new WrappingStoreProvider(Collections.<StateStoreProvider>singletonList(globalStoreProvider)), storeName);
return queryableStoreType.create(new WrappingStoreProvider(singletonList(globalStoreProvider)), storeName);
}
final List<T> allStores = new ArrayList<>();
for (final StateStoreProvider storeProvider : storeProviders) {

View File

@ -103,7 +103,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
this(name, DB_FILE_DIR);
}
RocksDBStore(final String name, final String parentDir) {
RocksDBStore(final String name,
final String parentDir) {
this.name = name;
this.parentDir = parentDir;
}
@ -222,7 +223,6 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
}
void toggleDbForBulkLoading(final boolean prepareForBulkload) {
if (prepareForBulkload) {
// if the store is not empty, we need to compact to get around the num.levels check
// for bulk loading
@ -434,7 +434,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]> {
}
}
private class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> implements KeyValueIterator<Bytes, byte[]> {
private class RocksDbIterator
extends AbstractIterator<KeyValue<Bytes, byte[]>>
implements KeyValueIterator<Bytes, byte[]> {
private final String storeName;
private final RocksIterator iter;

View File

@ -39,11 +39,16 @@ class StoreChangeLogger<K, V> {
private final ProcessorContext context;
private final RecordCollector collector;
StoreChangeLogger(final String storeName, final ProcessorContext context, final StateSerdes<K, V> serialization) {
StoreChangeLogger(final String storeName,
final ProcessorContext context,
final StateSerdes<K, V> serialization) {
this(storeName, context, context.taskId().partition, serialization);
}
private StoreChangeLogger(final String storeName, final ProcessorContext context, final int partition, final StateSerdes<K, V> serialization) {
private StoreChangeLogger(final String storeName,
final ProcessorContext context,
final int partition,
final StateSerdes<K, V> serialization) {
this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
this.context = context;
this.partition = partition;
@ -51,7 +56,8 @@ class StoreChangeLogger<K, V> {
this.collector = ((RecordCollector.Supplier) context).recordCollector();
}
void logChange(final K key, final V value) {
void logChange(final K key,
final V value) {
if (collector != null) {
final Serializer<K> keySerializer = serialization.keySerializer();
final Serializer<V> valueSerializer = serialization.valueSerializer();

View File

@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {
private final WindowBytesStoreSupplier storeSupplier;
@ -37,22 +36,24 @@ public class WindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowS
@Override
public WindowStore<K, V> build() {
return new MeteredWindowStore<>(maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
storeSupplier.metricsScope(),
time,
keySerde,
valueSerde);
return new MeteredWindowStore<>(
maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
storeSupplier.metricsScope(),
time,
keySerde,
valueSerde);
}
private WindowStore<Bytes, byte[]> maybeWrapCaching(final WindowStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
return new CachingWindowStore<>(inner,
keySerde,
valueSerde,
storeSupplier.windowSize(),
storeSupplier.segmentIntervalMs());
return new CachingWindowStore<>(
inner,
keySerde,
valueSerde,
storeSupplier.windowSize(),
storeSupplier.segmentIntervalMs());
}
private WindowStore<Bytes, byte[]> maybeWrapLogging(final WindowStore<Bytes, byte[]> inner) {

View File

@ -30,7 +30,7 @@ public class WrappingStoreProvider implements StateStoreProvider {
private final List<StateStoreProvider> storeProviders;
public WrappingStoreProvider(final List<StateStoreProvider> storeProviders) {
WrappingStoreProvider(final List<StateStoreProvider> storeProviders) {
this.storeProviders = storeProviders;
}
@ -42,11 +42,11 @@ public class WrappingStoreProvider implements StateStoreProvider {
* @param <T> The type of the Store, for example, {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore}
* @return a List of all the stores with the storeName and are accepted by {@link QueryableStoreType#accepts(StateStore)}
*/
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> type) {
public <T> List<T> stores(final String storeName,
final QueryableStoreType<T> type) {
final List<T> allStores = new ArrayList<>();
for (final StateStoreProvider provider : storeProviders) {
final List<T> stores =
provider.stores(storeName, type);
final List<T> stores = provider.stores(storeName, type);
allStores.addAll(stores);
}
if (allStores.isEmpty()) {

View File

@ -33,8 +33,8 @@ import java.util.TreeSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.hamcrest.core.IsCollectionContaining.hasItems;
import static org.hamcrest.core.IsIterableContaining.hasItem;
import static org.hamcrest.core.IsIterableContaining.hasItems;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertTrue;