mirror of https://github.com/apache/kafka.git
MINOR: KIP-138 renaming of string names
Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Damian Guy <damian.guy@gmail.com> Closes #3796 from guozhangwang/kip-138-minor-renames
This commit is contained in:
parent
3f155eaa23
commit
9b85cf9ed0
|
@ -115,7 +115,7 @@
|
||||||
this.context = context;
|
this.context = context;
|
||||||
|
|
||||||
// schedule a punctuation method every 1000 milliseconds.
|
// schedule a punctuation method every 1000 milliseconds.
|
||||||
this.context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator() {
|
this.context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
|
||||||
@Override
|
@Override
|
||||||
public void punctuate(long timestamp) {
|
public void punctuate(long timestamp) {
|
||||||
KeyValueIterator<String, Long> iter = this.kvStore.all();
|
KeyValueIterator<String, Long> iter = this.kvStore.all();
|
||||||
|
|
|
@ -95,8 +95,8 @@
|
||||||
<p>
|
<p>
|
||||||
Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the <code>punctuate</code> function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data.
|
Before this, users could only schedule based on stream time (i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the <code>punctuate</code> function was data-driven only because stream time is determined (and advanced forward) by the timestamps derived from the input data.
|
||||||
If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered.
|
If there is no data arriving at the processor, the stream time would not advance and hence punctuation will not be triggered.
|
||||||
On the other hand, When wall-clock time (i.e. <code>PunctuationType.SYSTEM_TIME</code>) is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
|
On the other hand, When wall-clock time (i.e. <code>PunctuationType.WALL_CLOCK_TIME</code>) is used, <code>punctuate</code> will be triggered purely based on wall-clock time.
|
||||||
So for example if the <code>Punctuator</code> function is scheduled based on <code>PunctuationType.SYSTEM_TIME</code>, if these 60 records were processed within 20 seconds,
|
So for example if the <code>Punctuator</code> function is scheduled based on <code>PunctuationType.WALL_CLOCK_TIME</code>, if these 60 records were processed within 20 seconds,
|
||||||
<code>punctuate</code> would be called 2 times (one time every 10 seconds);
|
<code>punctuate</code> would be called 2 times (one time every 10 seconds);
|
||||||
if these 60 records were processed within 5 seconds, then no <code>punctuate</code> would be called at all.
|
if these 60 records were processed within 5 seconds, then no <code>punctuate</code> would be called at all.
|
||||||
Users can schedule multiple <code>Punctuator</code> callbacks with different <code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code> multiple times inside processor's <code>init()</code> method.
|
Users can schedule multiple <code>Punctuator</code> callbacks with different <code>PunctuationType</code>s within the same processor by simply calling <code>ProcessorContext#schedule</code> multiple times inside processor's <code>init()</code> method.
|
||||||
|
|
|
@ -830,7 +830,7 @@ public interface KStream<K, V> {
|
||||||
* this.state = context.getStateStore("myTransformState");
|
* this.state = context.getStateStore("myTransformState");
|
||||||
* // punctuate each 1000ms; can access this.state
|
* // punctuate each 1000ms; can access this.state
|
||||||
* // can emit as many new KeyValue pairs as required via this.context#forward()
|
* // can emit as many new KeyValue pairs as required via this.context#forward()
|
||||||
* context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..));
|
* context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* KeyValue transform(K key, V value) {
|
* KeyValue transform(K key, V value) {
|
||||||
|
@ -903,7 +903,7 @@ public interface KStream<K, V> {
|
||||||
*
|
*
|
||||||
* void init(ProcessorContext context) {
|
* void init(ProcessorContext context) {
|
||||||
* this.state = context.getStateStore("myValueTransformState");
|
* this.state = context.getStateStore("myValueTransformState");
|
||||||
* context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
|
* context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* NewValueType transform(V value) {
|
* NewValueType transform(V value) {
|
||||||
|
@ -969,7 +969,7 @@ public interface KStream<K, V> {
|
||||||
*
|
*
|
||||||
* void init(ProcessorContext context) {
|
* void init(ProcessorContext context) {
|
||||||
* this.state = context.getStateStore("myProcessorState");
|
* this.state = context.getStateStore("myProcessorState");
|
||||||
* context.schedule(1000, PunctuationType.SYSTEM_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
|
* context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, new Punctuator(..)); // punctuate each 1000ms, can access this.state
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* void process(K key, V value) {
|
* void process(K key, V value) {
|
||||||
|
|
|
@ -96,14 +96,14 @@ public interface ProcessorContext {
|
||||||
* <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced by the processing of messages
|
* <li>{@link PunctuationType#STREAM_TIME} - uses "stream time", which is advanced by the processing of messages
|
||||||
* in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
|
* in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
|
||||||
* <b>NOTE:</b> Only advanced if messages arrive</li>
|
* <b>NOTE:</b> Only advanced if messages arrive</li>
|
||||||
* <li>{@link PunctuationType#SYSTEM_TIME} - uses system time (the wall-clock time),
|
* <li>{@link PunctuationType#WALL_CLOCK_TIME} - uses system time (the wall-clock time),
|
||||||
* which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
|
* which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
|
||||||
* independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited
|
* independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited
|
||||||
* by how long an iteration of the processing loop takes to complete</li>
|
* by how long an iteration of the processing loop takes to complete</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* @param interval the time interval between punctuations
|
* @param interval the time interval between punctuations
|
||||||
* @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#SYSTEM_TIME}
|
* @param type one of: {@link PunctuationType#STREAM_TIME}, {@link PunctuationType#WALL_CLOCK_TIME}
|
||||||
* @param callback a function consuming timestamps representing the current stream or system time
|
* @param callback a function consuming timestamps representing the current stream or system time
|
||||||
* @return a handle allowing cancellation of the punctuation schedule established by this method
|
* @return a handle allowing cancellation of the punctuation schedule established by this method
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -22,7 +22,7 @@ package org.apache.kafka.streams.processor;
|
||||||
* <li>STREAM_TIME - uses "stream time", which is advanced by the processing of messages
|
* <li>STREAM_TIME - uses "stream time", which is advanced by the processing of messages
|
||||||
* in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
|
* in accordance with the timestamp as extracted by the {@link TimestampExtractor} in use.
|
||||||
* <b>NOTE:</b> Only advanced if messages arrive</li>
|
* <b>NOTE:</b> Only advanced if messages arrive</li>
|
||||||
* <li>SYSTEM_TIME - uses system time (the wall-clock time),
|
* <li>WALL_CLOCK_TIME - uses system time (the wall-clock time),
|
||||||
* which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
|
* which is advanced at the polling interval ({@link org.apache.kafka.streams.StreamsConfig#POLL_MS_CONFIG})
|
||||||
* independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited
|
* independent of whether new messages arrive. <b>NOTE:</b> This is best effort only as its granularity is limited
|
||||||
* by how long an iteration of the processing loop takes to complete</li>
|
* by how long an iteration of the processing loop takes to complete</li>
|
||||||
|
@ -30,5 +30,5 @@ package org.apache.kafka.streams.processor;
|
||||||
*/
|
*/
|
||||||
public enum PunctuationType {
|
public enum PunctuationType {
|
||||||
STREAM_TIME,
|
STREAM_TIME,
|
||||||
SYSTEM_TIME,
|
WALL_CLOCK_TIME,
|
||||||
}
|
}
|
||||||
|
|
|
@ -524,7 +524,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case STREAM_TIME:
|
case STREAM_TIME:
|
||||||
return streamTimePunctuationQueue.schedule(schedule);
|
return streamTimePunctuationQueue.schedule(schedule);
|
||||||
case SYSTEM_TIME:
|
case WALL_CLOCK_TIME:
|
||||||
return systemTimePunctuationQueue.schedule(schedule);
|
return systemTimePunctuationQueue.schedule(schedule);
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
|
throw new IllegalArgumentException("Unrecognized PunctuationType: " + type);
|
||||||
|
@ -563,7 +563,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
|
||||||
public boolean maybePunctuateSystemTime() {
|
public boolean maybePunctuateSystemTime() {
|
||||||
final long timestamp = time.milliseconds();
|
final long timestamp = time.milliseconds();
|
||||||
|
|
||||||
return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.SYSTEM_TIME, this);
|
return systemTimePunctuationQueue.mayPunctuate(timestamp, PunctuationType.WALL_CLOCK_TIME, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -96,7 +96,7 @@ public class StreamTaskTest {
|
||||||
private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(topic1, intDeserializer, intDeserializer);
|
private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(topic1, intDeserializer, intDeserializer);
|
||||||
private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(topic2, intDeserializer, intDeserializer);
|
private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(topic2, intDeserializer, intDeserializer);
|
||||||
private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10L);
|
private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10L);
|
||||||
private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.SYSTEM_TIME);
|
private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10L, PunctuationType.WALL_CLOCK_TIME);
|
||||||
|
|
||||||
private final ProcessorTopology topology = new ProcessorTopology(
|
private final ProcessorTopology topology = new ProcessorTopology(
|
||||||
Arrays.<ProcessorNode>asList(source1, source2, processorStreamTime, processorSystemTime),
|
Arrays.<ProcessorNode>asList(source1, source2, processorStreamTime, processorSystemTime),
|
||||||
|
@ -405,7 +405,7 @@ public class StreamTaskTest {
|
||||||
assertTrue(task.maybePunctuateSystemTime());
|
assertTrue(task.maybePunctuateSystemTime());
|
||||||
time.sleep(10);
|
time.sleep(10);
|
||||||
assertTrue(task.maybePunctuateSystemTime());
|
assertTrue(task.maybePunctuateSystemTime());
|
||||||
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now + 10, now + 20, now + 30);
|
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10, now + 20, now + 30);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -414,7 +414,7 @@ public class StreamTaskTest {
|
||||||
assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
|
assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
|
||||||
time.sleep(9);
|
time.sleep(9);
|
||||||
assertFalse(task.maybePunctuateSystemTime());
|
assertFalse(task.maybePunctuateSystemTime());
|
||||||
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now);
|
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -425,7 +425,7 @@ public class StreamTaskTest {
|
||||||
processorSystemTime.supplier.scheduleCancellable.cancel();
|
processorSystemTime.supplier.scheduleCancellable.cancel();
|
||||||
time.sleep(10);
|
time.sleep(10);
|
||||||
assertFalse(task.maybePunctuateSystemTime());
|
assertFalse(task.maybePunctuateSystemTime());
|
||||||
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.SYSTEM_TIME, now + 10);
|
processorSystemTime.supplier.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
Loading…
Reference in New Issue