From 9b85cf9ed02d4b3b87ea39e0c2f8c35e1e813f2f Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 6 Sep 2017 18:39:40 -0700 Subject: [PATCH] MINOR: KIP-138 renaming of string names Author: Guozhang Wang Reviewers: Bill Bejeck , Matthias J. Sax , Damian Guy Closes #3796 from guozhangwang/kip-138-minor-renames --- docs/streams/developer-guide.html | 2 +- docs/streams/upgrade-guide.html | 4 ++-- .../java/org/apache/kafka/streams/kstream/KStream.java | 6 +++--- .../apache/kafka/streams/processor/ProcessorContext.java | 4 ++-- .../apache/kafka/streams/processor/PunctuationType.java | 4 ++-- .../kafka/streams/processor/internals/StreamTask.java | 4 ++-- .../kafka/streams/processor/internals/StreamTaskTest.java | 8 ++++---- 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index 8433bf3d69c..99467a7e446 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -115,7 +115,7 @@ this.context = context; // schedule a punctuation method every 1000 milliseconds. - this.context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator() { + this.context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { @Override public void punctuate(long timestamp) { KeyValueIterator<String, Long> iter = this.kvStore.all(); diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 9b04a599c18..a868c91f318 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -95,8 +95,8 @@

Before this, users could only schedule based on stream time (i.e. PunctuationType.STREAM_TIME) and hence the punctuate function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data. If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered. - On the other hand, When wall-clock time (i.e. PunctuationType.SYSTEM_TIME) is used, punctuate will be triggered purely based on wall-clock time. - So for example if the Punctuator function is scheduled based on PunctuationType.SYSTEM_TIME, if these 60 records were processed within 20 seconds, + On the other hand, When wall-clock time (i.e. PunctuationType.WALL_CLOCK_TIME) is used, punctuate will be triggered purely based on wall-clock time. + So for example if the Punctuator function is scheduled based on PunctuationType.WALL_CLOCK_TIME, if these 60 records were processed within 20 seconds, punctuate would be called 2 times (one time every 10 seconds); if these 60 records were processed within 5 seconds, then no punctuate would be called at all. Users can schedule multiple Punctuator callbacks with different PunctuationTypes within the same processor by simply calling ProcessorContext#schedule multiple times inside processor's init() method. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 8301cba54c4..b8b5b8d4873 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -830,7 +830,7 @@ public interface KStream { * this.state = context.getStateStore("myTransformState"); * // punctuate each 1000ms; can access this.state * // can emit as many new KeyValue pairs as required via this.context#forward() - * context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..)); + * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); * } * * KeyValue transform(K key, V value) { @@ -903,7 +903,7 @@ public interface KStream { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myValueTransformState"); - * context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * NewValueType transform(V value) { @@ -969,7 +969,7 @@ public interface KStream { * * void init(ProcessorContext context) { * this.state = context.getStateStore("myProcessorState"); - * context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state + * context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state * } * * void process(K key, V value) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index eef7e201847..cdf1612d065 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -96,14 +96,14 @@ public interface ProcessorContext { *

  • {@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced by the processing of messages * in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use. * NOTE: Only advanced if messages arrive
  • - *
  • {@link PunctuationType#SYSTEM_TIME} - uses system time (the wall-clock time), + *
  • {@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the wall-clock time), * which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG}) * independent of whether new messages arrive. NOTE: This is best effort only as its granularity is limited * by how long an iteration of the processing loop takes to complete
  • * * * @param interval the time interval between punctuations - * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#SYSTEM_TIME} + * @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME} * @param callback a function consuming timestamps representing the current stream or system time * @return a handle allowing cancellation of the punctuation schedule established by this method */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java b/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java index 4dd9300970b..bc0003da38c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PunctuationType.java @@ -22,7 +22,7 @@ package org.apache.kafka.streams.processor; *
  • STREAM_TIME - uses "stream time", which is advanced by the processing of messages * in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use. * NOTE: Only advanced if messages arrive
  • - *
  • SYSTEM_TIME - uses system time (the wall-clock time), + *
  • WALL_CLOCK_TIME - uses system time (the wall-clock time), * which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG}) * independent of whether new messages arrive. NOTE: This is best effort only as its granularity is limited * by how long an iteration of the processing loop takes to complete
  • @@ -30,5 +30,5 @@ package org.apache.kafka.streams.processor; */ public enum PunctuationType { STREAM_TIME, - SYSTEM_TIME, + WALL_CLOCK_TIME, } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 1fe567e9170..288a597a657 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -524,7 +524,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator switch (type) { case STREAM_TIME: return streamTimePunctuationQueue.schedule(schedule); - case SYSTEM_TIME: + case WALL_CLOCK_TIME: return systemTimePunctuationQueue.schedule(schedule); default: throw new IllegalArgumentException("Unrecognized PunctuationType: " + type); @@ -563,7 +563,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator public boolean maybePunctuateSystemTime() { final long timestamp = time.milliseconds(); - return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.SYSTEM_TIME, this); + return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 61d78754ef7..a9d43bacd5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -96,7 +96,7 @@ public class StreamTaskTest { private final MockSourceNode source1 = new MockSourceNode<>(topic1, intDeserializer, intDeserializer); private final MockSourceNode source2 = new MockSourceNode<>(topic2, intDeserializer, intDeserializer); private final MockProcessorNode processorStreamTime = new MockProcessorNode<>(10L); - private final MockProcessorNode processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.SYSTEM_TIME); + private final MockProcessorNode processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME); private final ProcessorTopology topology = new ProcessorTopology( Arrays.asList(source1, source2, processorStreamTime, processorSystemTime), @@ -405,7 +405,7 @@ public class StreamTaskTest { assertTrue(task.maybePunctuateSystemTime()); time.sleep(10); assertTrue(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now + 10, now + 20, now + 30); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30); } @Test @@ -414,7 +414,7 @@ public class StreamTaskTest { assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate time.sleep(9); assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now); } @Test @@ -425,7 +425,7 @@ public class StreamTaskTest { processorSystemTime.supplier.scheduleCancellable.cancel(); time.sleep(10); assertFalse(task.maybePunctuateSystemTime()); - processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now + 10); + processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10); } @SuppressWarnings("unchecked")