KAFKA-12360: Document new time semantics (#11003)

Update the docs for task idling, since the semantics have
changed in 3.0.

Reviewers: Jim Galasyn <jim.galasyn@confluent.io>, Luke Chen <showuon@gmail.com>, Boyang Chen <boyang@apache.org>
This commit is contained in:
John Roesler 2021-07-12 16:16:29 -05:00 committed by GitHub
parent 2b8d41b468
commit bfdef11b97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 6 deletions

View File

@ -243,8 +243,21 @@ settings.put(... , ...);</code></pre>
</tr>
<tr class="row-even"><td>max.task.idle.ms</td>
<td>Medium</td>
<td colspan="2">Maximum amount of time in milliseconds a stream task will stay idle while waiting for all partitions to contain data
and avoid potential out-of-order record processing across multiple input streams.</td>
<td colspan="2">
<p>
This config controls whether joins and merges may produce out-of-order results.
The config value is the maximum amount of time in milliseconds a stream task will stay idle
when it is fully caught up on some (but not all) input partitions
to wait for producers to send additional records and avoid potential
out-of-order record processing across multiple input streams.
The default (zero) does not wait for producers to send more records,
but it does wait to fetch data that is already present on the brokers.
This default means that for records that are already present on the brokers,
Streams will process them in timestamp order.
Set to -1 to disable idling entirely and process any locally available data,
even though doing so may produce out-of-order processing.
</p>
</td>
<td>0 milliseconds</td>
</tr>
<tr class="row-odd"><td>max.warmup.replicas</td>
@ -602,8 +615,54 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
<span id="streams-developer-guide-max-task-idle-ms"></span><h4><a class="toc-backref" href="#id28">max.task.idle.ms</a><a class="headerlink" href="#max-task-idle-ms" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
The maximum amount of time a task will idle without processing data when waiting for all of its input partition buffers to contain records. This can help avoid potential out-of-order
processing when the task has multiple input streams, as in a join, for example. Setting this to a nonzero value may increase latency but will improve time synchronization.
<p>
This configuration controls how long Streams will wait to fetch data in order to
provide in-order processing semantics.
</p>
<p>
When processing a task that has multiple input partitions (as in a join or merge),
Streams needs to choose which partition to process the next record from.
When all input partitions have locally buffered data, Streams picks the partition
whose next record has the lowest timestamp. This has the desirable effect of
collating the input partitions in timestamp order, which is generally what you
want in a streaming join or merge. However, when Streams does not have any data
buffered locally for one of the partitions, it does not know whether the next
record for that partition will have a lower or higher timestamp than the remaining
partitions' records.
</p>
<p>
There are two cases to consider: either there is data in that partition on the
broker that Streams has not fetched yet, or Streams is fully caught up with that
partition on the broker, and the producers simply haven't produced any new records
since Streams polled the last batch.
</p>
<p>
The default value of
<code class="docutils literal"><span class="pre">0</span></code>
causes Streams to delay processing a task when it detects that it has no locally
buffered data for a partition, but there is data available on the brokers.
Specifically, when there is an empty partition in the local buffer, but Streams
has a non-zero lag for that partition. However, as soon as Streams catches up to
the broker, it will continue processing, even if there is no data in one of the
partitions. That is, it will not wait for new data to be <em>produced</em>.
This default is designed to sacrifice some throughput in exchange for intuitively
correct join semantics.
</p>
<p>
Any config value greater than zero indicates the number of <em>extra</em>
milliseconds that Streams will wait if it has a caught-up but empty partition.
In other words, this is the amount of time to wait for new data to be produced
to the input partitions to ensure in-order processing of data in the event
of a slow producer.
</p>
<p>
The config value of
<code class="docutils literal"><span class="pre">-1</span></code>
indicates that Streams will never wait to buffer empty partitions before choosing
the next record by timestamp, which achieves maximum throughput at the expense of
introducing out-of-order processing.
</p>
</div>
</blockquote>
</div>

View File

@ -95,6 +95,23 @@
</p>
<h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API changes in 3.0.0</a></h3>
<p>
We improved the semantics of
<a href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">task idling (<code>max.task.idle.ms</code>)</a>.
Now Streams provides stronger in-order join and merge processing semantics.
Streams's new default pauses processing on tasks with multiple input partitions
when one of the partitions has no data buffered locally but has a non-zero lag. In other
words, Streams will wait to fetch records that are already available on the broker. This
results in improved join semantics, since it allows Streams to interleave the two input
partitions in timestamp order instead of just processing whichever partition happens to be
buffered. There is an option to disable this new behavior, and there is also an option to
make Streams wait even longer for new records to be <em>produced</em> to the input partitions,
which you can use to get stronger time semantics when you know some of your producers may be
slow. See the
<a href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">config reference</a>
for more information, and <a href="https://cwiki.apache.org/confluence/x/JSXZCQ">KIP-695</a>
for the larger context of this change.
</p>
<p>
Interactive Queries may throw new exceptions for different errors:
</p>

View File

@ -427,8 +427,18 @@ public class StreamsConfig extends AbstractConfig {
/** {@code max.task.idle.ms} */
public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time in milliseconds a stream task will stay idle when not all of its partition buffers contain records," +
" to avoid potential out-of-order record processing across multiple input streams.";
private static final String MAX_TASK_IDLE_MS_DOC = "This config controls whether joins and merges"
+ " may produce out-of-order results."
+ " The config value is the maximum amount of time in milliseconds a stream task will stay idle"
+ " when it is fully caught up on some (but not all) input partitions"
+ " to wait for producers to send additional records and avoid potential"
+ " out-of-order record processing across multiple input streams."
+ " The default (zero) does not wait for producers to send more records,"
+ " but it does wait to fetch data that is already present on the brokers."
+ " This default means that for records that are already present on the brokers,"
+ " Streams will process them in timestamp order."
+ " Set to -1 to disable idling entirely and process any locally available data,"
+ " even though doing so may produce out-of-order processing.";
/** {@code max.warmup.replicas} */
public static final String MAX_WARMUP_REPLICAS_CONFIG = "max.warmup.replicas";