KAFKA-18023: Enforcing Explicit Naming for Kafka Streams Internal Topics (#18233)

Pull request to implement KIP-1111, aims to add a configuration that
prevents a Kafka Streams application from starting if any of its
internal topics have auto-generated names, thereby enforcing explicit
naming for all internal topics and enhancing the stability of the
application’s topology.

- Repartition Topics:

All repartition topics are created in the
KStreamImpl.createRepartitionedSource(...) static method. This method
either receives a name explicitly provided by the user or null and then
builds the final repartition topic name.

- Changelog Topics and State Store Names:

There are several scenarios where these are created:
  In the MaterializedInternal constructor.
  During KStream/KStream joins.
  During KStream/KTable joins with grace periods.
  With key-value buffers are used in suppressions.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Sophie Blee-Goldman <sophie@responsive.dev>
This commit is contained in:
Sebastien Viale 2025-02-24 11:41:42 +01:00 committed by GitHub
parent 2880e04129
commit 3ce5f23295
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 881 additions and 61 deletions

View File

@ -79,6 +79,7 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li> <li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li> <li><a class="reference internal" href="#deserialization-exception-handler" id="id7">deserialization.exception.handler</a></li>
<li><a class="reference internal" href="#enable-metrics-push" id="id43">enable.metrics.push</a></li> <li><a class="reference internal" href="#enable-metrics-push" id="id43">enable.metrics.push</a></li>
<li><a class="reference internal" href="#ensure-explicit-internal-resource-naming" id="id46">ensure.explicit.internal.resource.naming</a></li>
<li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li> <li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li> <li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li> <li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
@ -348,17 +349,26 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</td> </td>
<td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td> <td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
</tr> </tr>
<tr class="row-odd"><td>log.summary.interval.ms</td> <tr class="row-odd"><td>ensure.explicit.internal.resource.naming</td>
<td>High</td>
<td colspan="2">
Whether to enforce explicit naming for all internal resources of the topology, including internal
topics (e.g., changelog and repartition topics) and their associated state stores.
When enabled, the application will refuse to start if any internal resource has an auto-generated name.
</td>
<td><code class="docutils literal"><span class="pre">false</span></code></td>
</tr>
<tr class="row-even"><td>log.summary.interval.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The output interval in milliseconds for logging summary information (disabled if negative).</td> <td colspan="2">The output interval in milliseconds for logging summary information (disabled if negative).</td>
<td><code class="docutils literal"><span class="pre">120000</span></code> (2 minutes)</td> <td><code class="docutils literal"><span class="pre">120000</span></code> (2 minutes)</td>
</tr> </tr>
<tr class="row-even"><td>enable.metrics.push</td> <tr class="row-odd"><td>enable.metrics.push</td>
<td>Low</td> <td>Low</td>
<td colspan="2">Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client.</td> <td colspan="2">Whether to enable pushing of client metrics to the cluster, if the cluster has a client metrics subscription which matches this client.</td>
<td><code class="docutils literal"><span class="pre">true</span></code></td> <td><code class="docutils literal"><span class="pre">true</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>max.task.idle.ms</td> <tr class="row-even"><td>max.task.idle.ms</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2"> <td colspan="2">
<p> <p>
@ -377,76 +387,76 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</td> </td>
<td><code class="docutils literal"><span class="pre">0</span></code></td> <td><code class="docutils literal"><span class="pre">0</span></code></td>
</tr> </tr>
<tr class="row-even"><td>max.warmup.replicas</td> <tr class="row-odd"><td>max.warmup.replicas</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td> <td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td> <td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>metric.reporters</td> <tr class="row-even"><td>metric.reporters</td>
<td>Low</td> <td>Low</td>
<td colspan="2">A list of classes to use as metrics reporters.</td> <td colspan="2">A list of classes to use as metrics reporters.</td>
<td>the empty list</td> <td>the empty list</td>
</tr> </tr>
<tr class="row-even"><td>metrics.num.samples</td> <tr class="row-odd"><td>metrics.num.samples</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The number of samples maintained to compute metrics.</td> <td colspan="2">The number of samples maintained to compute metrics.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td> <td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>metrics.recording.level</td> <tr class="row-even"><td>metrics.recording.level</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The highest recording level for metrics.</td> <td colspan="2">The highest recording level for metrics.</td>
<td><code class="docutils literal"><span class="pre">INFO</span></code></td> <td><code class="docutils literal"><span class="pre">INFO</span></code></td>
</tr> </tr>
<tr class="row-even"><td>metrics.sample.window.ms</td> <tr class="row-odd"><td>metrics.sample.window.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td> <td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td>
<td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds)</td> <td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds)</td>
</tr> </tr>
<tr class="row-odd"><td>num.standby.replicas</td> <tr class="row-even"><td>num.standby.replicas</td>
<td>High</td> <td>High</td>
<td colspan="2">The number of standby replicas for each task.</td> <td colspan="2">The number of standby replicas for each task.</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td> <td><code class="docutils literal"><span class="pre">0</span></code></td>
</tr> </tr>
<tr class="row-even"><td>num.stream.threads</td> <tr class="row-odd"><td>num.stream.threads</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The number of threads to execute stream processing.</td> <td colspan="2">The number of threads to execute stream processing.</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td> <td><code class="docutils literal"><span class="pre">1</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>probing.rebalance.interval.ms</td> <tr class="row-even"><td>probing.rebalance.interval.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td> <td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td>
<td><code class="docutils literal"><span class="pre">600000</span></code> (10 minutes)</td> <td><code class="docutils literal"><span class="pre">600000</span></code> (10 minutes)</td>
</tr> </tr>
<tr class="row-even"><td>processing.exception.handler</td> <tr class="row-odd"><td>processing.exception.handler</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProcessingExceptionHandler</span></code> interface.</td> <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProcessingExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">LogAndFailProcessingExceptionHandler</span></code></td> <td><code class="docutils literal"><span class="pre">LogAndFailProcessingExceptionHandler</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>processing.guarantee</td> <tr class="row-even"><td>processing.guarantee</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> <td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code>
or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+).
See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a>.</td>. See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a>.</td>.
<td><code class="docutils literal"><span class="pre">"at_least_once"</span></code></td> <td><code class="docutils literal"><span class="pre">"at_least_once"</span></code></td>
</tr> </tr>
<tr class="row-even"><td>processor.wrapper.class</td> <tr class="row-odd"><td>processor.wrapper.class</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">A class or class name implementing the <code class="docutils literal"><span class="pre">ProcessorWrapper</span></code> interface. <td colspan="2">A class or class name implementing the <code class="docutils literal"><span class="pre">ProcessorWrapper</span></code> interface.
Must be passed in when creating the topology, and will not be applied unless passed in to the appropriate constructor as a TopologyConfig. You should Must be passed in when creating the topology, and will not be applied unless passed in to the appropriate constructor as a TopologyConfig. You should
use the <code class="docutils literal"><span class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the use the <code class="docutils literal"><span class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for DSL applications, and the
<code class="docutils literal"><span class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.</td> <code class="docutils literal"><span class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI applications.</td>
</tr> </tr>
<tr class="row-odd"><td>production.exception.handler</td> <tr class="row-even"><td>production.exception.handler</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td> <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td> <td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td>
</tr> </tr>
<tr class="row-even"><td>poll.ms</td> <tr class="row-odd"><td>poll.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The amount of time in milliseconds to block waiting for input.</td> <td colspan="2">The amount of time in milliseconds to block waiting for input.</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td> <td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>rack.aware.assignment.strategy</td> <tr class="row-even"><td>rack.aware.assignment.strategy</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The strategy used for rack aware assignment. Acceptable value are <td colspan="2">The strategy used for rack aware assignment. Acceptable value are
<code class="docutils literal"><span class="pre">"none"</span></code> (default), <code class="docutils literal"><span class="pre">"none"</span></code> (default),
@ -455,7 +465,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
See <a class="reference internal" href="#rack-aware-assignment-strategy"><span class="std std-ref">Rack Aware Assignment Strategy</span></a>.</td> See <a class="reference internal" href="#rack-aware-assignment-strategy"><span class="std std-ref">Rack Aware Assignment Strategy</span></a>.</td>
<td><code class="docutils literal"><span class="pre">"none"</span></code></td> <td><code class="docutils literal"><span class="pre">"none"</span></code></td>
</tr> </tr>
<tr class="row-even><td>rack.aware.assignment.tags</td> <tr class="row-odd"><td>rack.aware.assignment.tags</td>
<td>Low</td> <td>Low</td>
<td colspan="2">List of tag keys used to distribute standby replicas across Kafka Streams <td colspan="2">List of tag keys used to distribute standby replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over
@ -463,72 +473,72 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
See <a class="reference internal" href="#rack-aware-assignment-tags"><span class="std std-ref">Rack Aware Assignment Tags</span></a>.</td> See <a class="reference internal" href="#rack-aware-assignment-tags"><span class="std std-ref">Rack Aware Assignment Tags</span></a>.</td>
<td>the empty list</td> <td>the empty list</td>
</tr> </tr>
<tr class="row-odd"><td>rack.aware.assignment.non_overlap_cost</td> <tr class="row-even"><td>rack.aware.assignment.non_overlap_cost</td>
<td>Low</td> <td>Low</td>
<td colspan="2">Cost associated with moving tasks from existing assignment. <td colspan="2">Cost associated with moving tasks from existing assignment.
See <a class="reference internal" href="#rack-aware-assignment-non-overlap-cost"><span class="std std-ref">Rack Aware Assignment Non-Overlap-Cost</span></a>.</td> See <a class="reference internal" href="#rack-aware-assignment-non-overlap-cost"><span class="std std-ref">Rack Aware Assignment Non-Overlap-Cost</span></a>.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td> <td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr> </tr>
<tr class="row-even"><td>rack.aware.assignment.non_overlap_cost</td> <tr class="row-odd"><td>rack.aware.assignment.non_overlap_cost</td>
<td>Low</td> <td>Low</td>
<td colspan="2">Cost associated with cross rack traffic. <td colspan="2">Cost associated with cross rack traffic.
See <a class="reference internal" href="#rack-aware-assignment-traffic-cost"><span class="std std-ref">Rack Aware Assignment Traffic-Cost</span></a>.</td> See <a class="reference internal" href="#rack-aware-assignment-traffic-cost"><span class="std std-ref">Rack Aware Assignment Traffic-Cost</span></a>.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td> <td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>replication.factor</td> <tr class="row-even"><td>replication.factor</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The replication factor for changelog topics and repartition topics created by the application. <td colspan="2">The replication factor for changelog topics and repartition topics created by the application.
The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td> The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td> <td><code class="docutils literal"><span class="pre">-1</span></code></td>
</tr> </tr>
<tr class="row-even"><td>retry.backoff.ms</td> <tr class="row-odd"><td>retry.backoff.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The amount of time in milliseconds, before a request is retried.</td> <td colspan="2">The amount of time in milliseconds, before a request is retried.</td>
<td><code class="docutils literal"><span class="pre">100</span></code></td> <td><code class="docutils literal"><span class="pre">100</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>rocksdb.config.setter</td> <tr class="row-even"><td>rocksdb.config.setter</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The RocksDB configuration.</td> <td colspan="2">The RocksDB configuration.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td> <td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr> </tr>
<tr class="row-even"><td>state.cleanup.delay.ms</td> <tr class="row-odd"><td>state.cleanup.delay.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td> <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td>
<td><code class="docutils literal"><span class="pre">600000</span></code></td> (10 minutes)</td> <td><code class="docutils literal"><span class="pre">600000</span></code></td> (10 minutes)</td>
</tr> </tr>
<tr class="row-odd"><td>state.dir</td> <tr class="row-even"><td>state.dir</td>
<td>High</td> <td>High</td>
<td colspan="2">Directory location for state stores.</td> <td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td> <td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
</tr> </tr>
<tr class="row-even"><td>task.assignor.class</td> <tr class="row-odd"><td>task.assignor.class</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">A task assignor class or class name implementing the <code>TaskAssignor</code> interface.</td> <td colspan="2">A task assignor class or class name implementing the <code>TaskAssignor</code> interface.</td>
<td>The high-availability task assignor.</td> <td>The high-availability task assignor.</td>
</tr> </tr>
<tr class="row-odd"><td>task.timeout.ms</td> <tr class="row-even"><td>task.timeout.ms</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td> <td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<td><code class="docutils literal"><span class="pre">300000</span></code></td> (5 minutes)</td> <td><code class="docutils literal"><span class="pre">300000</span></code></td> (5 minutes)</td>
</tr> </tr>
<tr class="row-even"><td>topology.optimization</td> <tr class="row-odd"><td>topology.optimization</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>), <td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>),
<code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td> <code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td>
<td><code class="docutils literal"><span class="pre">"NO_OPTIMIZATION"</span></code></td> <td><code class="docutils literal"><span class="pre">"NO_OPTIMIZATION"</span></code></td>
</tr> </tr>
<tr class="row-odd"><td>upgrade.from</td> <tr class="row-even"><td>upgrade.from</td>
<td>Medium</td> <td>Medium</td>
<td colspan="2">The version you are upgrading from during a rolling upgrade. <td colspan="2">The version you are upgrading from during a rolling upgrade.
See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td> See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
<td><code class="docutils literal"><span class="pre">null</span></code></td> <td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr> </tr>
<tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td> <tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td> <td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td> <td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td>
</tr> </tr>
<tr class="row-odd"><td>window.size.ms</td> <tr class="row-even"><td>window.size.ms</td>
<td>Low</td> <td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td> <td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td> <td><code class="docutils literal"><span class="pre">null</span></code></td>
@ -753,6 +763,18 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
<p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p> <p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
</div></blockquote> </div></blockquote>
</div> </div>
<div class="section" id="ensure-explicit-internal-resource-naming">
<h4><a class="toc-backref" href="#id46">ensure.explicit.internal.resource.naming</a><a class="headerlink" href="#ensure-explicit-internal-resource-naming" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Whether to enforce explicit naming for all internal resources of the topology, including internal
topics (e.g., changelog and repartition topics) and their associated state stores.
When enabled, the application will refuse to start if any internal resource has an auto-generated name.
</p>
</div>
</blockquote>
</div>
<div class="section" id="rack-aware-assignment-non-overlap-cost"> <div class="section" id="rack-aware-assignment-non-overlap-cost">
<h4><a class="toc-backref" href="#id37">rack.aware.assignment.non_overlap_cost</a><a class="headerlink" href="#rack-aware-assignment-non-overlap-cost" title="Permalink to this headline"></a></h4> <h4><a class="toc-backref" href="#id37">rack.aware.assignment.non_overlap_cost</a><a class="headerlink" href="#rack-aware-assignment-non-overlap-cost" title="Permalink to this headline"></a></h4>
<blockquote> <blockquote>

View File

@ -300,6 +300,19 @@ stream.filter((k, v) -> v != null && v.length() >= 6)
<td>Stream/Table non-stateful operations</td><td>Named</td> <td>Stream/Table non-stateful operations</td><td>Named</td>
</tr> </tr>
</table> </table>
To further enforce best practices, Kafka Streams provides a configuration option,
<code class="docutils literal"><span class="pre">ensure.explicit.internal.resource.naming</span></code>:
<pre class="line-numbers"><code class="language-java">/
Properties props = new Properties();
props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
</code></pre>
This parameter ensures that all internal topics, state stores, and changelog topics have explicitly defined names. When this configuration
is enabled, a Kafka Streams application will not start if any of these components rely on auto-generated names. This guarantees
stability across topology updates, as manually defined names remain unchanged even when new processors or transformations are added.
Enforcing explicit naming is particularly important in production environments, where consistency and backward compatibility are essential
for maintaining reliable stream processing applications.
</p> </p>
</div> </div>

View File

@ -139,6 +139,16 @@
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>. More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p> </p>
<h3><a id="streams_api_changes_410" href="#streams_api_changes_410">Streams API changes in 4.1.0</a></h3>
<p>
The introduction of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1111:+Enforcing+Explicit+Naming+for+Kafka+Streams+Internal+Topics">KIP-1111</a>
enables you to enforce explicit naming for all internal resources of the topology, including internal topics (e.g., changelog and repartition topics) and their associated state stores.
This ensures that every internal resource is named before the Kafka Streams application is deployed, which is essential for upgrading your topology.
You can enable this feature via <code>StreamsConfig</code> using the <code>StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG</code> parameter.
When set to <code>true</code>, the application will refuse to start if any internal resource has an auto-generated name.
</p>
<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3> <h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
<p> <p>

View File

@ -384,7 +384,7 @@ public class StreamsBuilder {
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>( new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()), Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()).withLoggingDisabled(),
internalStreamsBuilder, internalStreamsBuilder,
topic + "-", topic + "-",
true /* force materializing global tables */); true /* force materializing global tables */);
@ -457,7 +457,7 @@ public class StreamsBuilder {
Objects.requireNonNull(materialized, "materialized can't be null"); Objects.requireNonNull(materialized, "materialized can't be null");
final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed); final ConsumedInternal<K, V> consumedInternal = new ConsumedInternal<>(consumed);
// always use the serdes from consumed // always use the serdes from consumed
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()); materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()).withLoggingDisabled();
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"); new MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-");

View File

@ -592,6 +592,12 @@ public class StreamsConfig extends AbstractConfig {
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." + public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." +
" The cluster must have a client metrics subscription which corresponds to a client."; " The cluster must have a client metrics subscription which corresponds to a client.";
/** {@code ensure.explicit.internal.resource.naming} */
public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG = "ensure.explicit.internal.resource.naming";
static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = "Whether to enforce explicit naming for all internal resources of the topology, including internal" +
" topics (e.g., changelog and repartition topics) and their associated state stores." +
" When enabled, the application will refuse to start if any internal resource has an auto-generated name.";
/** {@code log.summary.interval.ms} */ /** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms"; public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" + private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" +
@ -869,6 +875,11 @@ public class StreamsConfig extends AbstractConfig {
Importance.HIGH, Importance.HIGH,
STATE_DIR_DOC, STATE_DIR_DOC,
"${java.io.tmpdir}") "${java.io.tmpdir}")
.define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
Type.BOOLEAN,
false,
Importance.HIGH,
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC)
// MEDIUM // MEDIUM

View File

@ -55,6 +55,8 @@ import static org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_H
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC;
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY; import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
@ -142,7 +144,12 @@ public final class TopologyConfig extends AbstractConfig {
Type.CLASS, Type.CLASS,
DSL_STORE_SUPPLIERS_CLASS_DEFAULT, DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
Importance.LOW, Importance.LOW,
DSL_STORE_SUPPLIERS_CLASS_DOC); DSL_STORE_SUPPLIERS_CLASS_DOC)
.define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
Type.BOOLEAN,
false,
Importance.HIGH,
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC);
} }
private static final Logger log = LoggerFactory.getLogger(TopologyConfig.class); private static final Logger log = LoggerFactory.getLogger(TopologyConfig.class);
@ -164,6 +171,8 @@ public final class TopologyConfig extends AbstractConfig {
public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier; public final Supplier<DeserializationExceptionHandler> deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier; public final Supplier<ProcessingExceptionHandler> processingExceptionHandlerSupplier;
public final boolean ensureExplicitInternalResourceNaming;
public TopologyConfig(final StreamsConfig configs) { public TopologyConfig(final StreamsConfig configs) {
this(null, configs, mkObjectProperties(configs.originals())); this(null, configs, mkObjectProperties(configs.originals()));
} }
@ -272,6 +281,8 @@ public final class TopologyConfig extends AbstractConfig {
} else { } else {
dslStoreSuppliers = globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG); dslStoreSuppliers = globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
} }
ensureExplicitInternalResourceNaming = globalAppConfigs.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG);
} }
@Deprecated @Deprecated

View File

@ -59,7 +59,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde, final Serde<VOut> valueSerde,
final String queryableName, final String queryableName,
final boolean isOutputVersioned) { final boolean isOutputVersioned) {
processRepartitions(groupPatterns, storeFactory.storeName()); processRepartitions(groupPatterns, storeFactory.storeName(), queryableName);
final Collection<GraphNode> processors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
@ -94,7 +94,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde, final Serde<VOut> valueSerde,
final String queryableName, final String queryableName,
final Windows<W> windows) { final Windows<W> windows) {
processRepartitions(groupPatterns, storeFactory.storeName()); processRepartitions(groupPatterns, storeFactory.storeName(), queryableName);
final Collection<GraphNode> processors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
@ -135,7 +135,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String queryableName, final String queryableName,
final SessionWindows sessionWindows, final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) { final Merger<? super K, VOut> sessionMerger) {
processRepartitions(groupPatterns, storeFactory.storeName()); processRepartitions(groupPatterns, storeFactory.storeName(), queryableName);
final Collection<GraphNode> processors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
int counter = 0; int counter = 0;
@ -175,7 +175,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde, final Serde<VOut> valueSerde,
final String queryableName, final String queryableName,
final SlidingWindows slidingWindows) { final SlidingWindows slidingWindows) {
processRepartitions(groupPatterns, storeFactory.storeName()); processRepartitions(groupPatterns, storeFactory.storeName(), queryableName);
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>(); final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>(); final Collection<GraphNode> processors = new ArrayList<>();
int counter = 0; int counter = 0;
@ -206,7 +206,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
} }
private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns, private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final String storeName) { final String storeName,
final String queryableName) {
for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) { for (final KGroupedStreamImpl<K, ?> repartitionReqs : groupPatterns.keySet()) {
if (repartitionReqs.repartitionRequired) { if (repartitionReqs.repartitionRequired) {
@ -216,8 +217,9 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ? final String repartitionNamePrefix = repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName : storeName; repartitionReqs.userProvidedRepartitionTopicName : storeName;
createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde); final boolean isRepartitionTopicNameProvidedByUser = repartitionReqs.userProvidedRepartitionTopicName != null || queryableName != null;
createRepartitionSource(repartitionNamePrefix, repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde, isRepartitionTopicNameProvidedByUser);
if (!parentNodes.containsKey(repartitionReqs)) { if (!parentNodes.containsKey(repartitionReqs)) {
final GraphNode repartitionNode = repartitionNodeBuilder.build(); final GraphNode repartitionNode = repartitionNodeBuilder.build();
builder.addGraphNode(repartitionReqs.graphNode, repartitionNode); builder.addGraphNode(repartitionReqs.graphNode, repartitionNode);
@ -270,14 +272,16 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix, private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder, final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
final Serde<K> keySerde, final Serde<K> keySerde,
final Serde<?> valueSerde) { final Serde<?> valueSerde,
final boolean isRepartitionTopicNameProvidedByUser) {
KStreamImpl.createRepartitionedSource(builder, KStreamImpl.createRepartitionedSource(builder,
keySerde, keySerde,
(Serde<VIn>) valueSerde, (Serde<VIn>) valueSerde,
repartitionTopicNamePrefix, repartitionTopicNamePrefix,
null, null,
(OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder); (OptimizableRepartitionNodeBuilder<K, VIn>) optimizableRepartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser);
} }
} }

View File

@ -126,8 +126,10 @@ class GroupedStreamAggregateBuilder<K, V> {
if (repartitionRequired) { if (repartitionRequired) {
final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); final OptimizableRepartitionNodeBuilder<K, V> repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeName; final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeName;
sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder);
sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder, userProvidedRepartitionTopicName != null || queryableStoreName != null);
// First time through we need to create a repartition node. // First time through we need to create a repartition node.
// Any subsequent calls to GroupedStreamAggregateBuilder#build we check if // Any subsequent calls to GroupedStreamAggregateBuilder#build we check if
@ -157,14 +159,16 @@ class GroupedStreamAggregateBuilder<K, V> {
* @return the new sourceName of the repartitioned source * @return the new sourceName of the repartitioned source
*/ */
private String createRepartitionSource(final String repartitionTopicNamePrefix, private String createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) { final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder,
final boolean isRepartitionTopicNameProvidedByUser) {
return KStreamImpl.createRepartitionedSource(builder, return KStreamImpl.createRepartitionedSource(builder,
keySerde, keySerde,
valueSerde, valueSerde,
repartitionTopicNamePrefix, repartitionTopicNamePrefix,
null, null,
optimizableRepartitionNodeBuilder); optimizableRepartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser);
} }
} }

View File

@ -328,6 +328,9 @@ public class InternalStreamsBuilder implements InternalNameProvider {
} }
} }
internalTopologyBuilder.validateCopartition(); internalTopologyBuilder.validateCopartition();
internalTopologyBuilder.checkUnprovidedNames();
} }
/** /**
@ -588,7 +591,8 @@ public class InternalStreamsBuilder implements InternalNameProvider {
valueSerde, valueSerde,
repartitionTopicName, repartitionTopicName,
null, null,
repartitionNodeBuilder repartitionNodeBuilder,
true
); );
// ensures setting the repartition topic to the name of the // ensures setting the repartition topic to the name of the

View File

@ -58,6 +58,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStore;
@ -525,7 +526,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
valueSerde, valueSerde,
name, name,
repartitionedInternal.streamPartitioner(), repartitionedInternal.streamPartitioner(),
unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties) unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties),
repartitionedInternal.name() != null
); );
final UnoptimizableRepartitionNode<K, V> unoptimizableRepartitionNode = unoptimizableRepartitionNodeBuilder.build(); final UnoptimizableRepartitionNode<K, V> unoptimizableRepartitionNode = unoptimizableRepartitionNodeBuilder.build();
@ -633,7 +635,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
valueSerdeOverride, valueSerdeOverride,
name, name,
null, null,
repartitionNodeBuilder repartitionNodeBuilder,
namedInternal.name() != null
); );
tableParentNode = repartitionNodeBuilder.build(); tableParentNode = repartitionNodeBuilder.build();
@ -895,21 +898,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
if (joinThis.repartitionRequired) { if (joinThis.repartitionRequired) {
final String joinThisName = joinThis.name; final String joinThisName = joinThis.name;
final String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName); final String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName);
joinThis = joinThis.repartitionForJoin( joinThis = joinThis.repartitionForJoin(
leftJoinRepartitionTopicName, leftJoinRepartitionTopicName,
streamJoinedInternal.keySerde(), streamJoinedInternal.keySerde(),
streamJoinedInternal.valueSerde() streamJoinedInternal.valueSerde(),
); name.name() != null);
} }
if (joinOther.repartitionRequired) { if (joinOther.repartitionRequired) {
final String joinOtherName = joinOther.name; final String joinOtherName = joinOther.name;
final String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName); final String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName);
joinOther = joinOther.repartitionForJoin( joinOther = joinOther.repartitionForJoin(
rightJoinRepartitionTopicName, rightJoinRepartitionTopicName,
streamJoinedInternal.keySerde(), streamJoinedInternal.keySerde(),
streamJoinedInternal.otherValueSerde() streamJoinedInternal.otherValueSerde(),
); name.name() != null);
} }
joinThis.ensureCopartitionWith(Collections.singleton(joinOther)); joinThis.ensureCopartitionWith(Collections.singleton(joinOther));
@ -928,7 +933,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
*/ */
private KStreamImpl<K, V> repartitionForJoin(final String repartitionName, private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
final Serde<K> keySerdeOverride, final Serde<K> keySerdeOverride,
final Serde<V> valueSerdeOverride) { final Serde<V> valueSerdeOverride,
final boolean isRepartitionTopicNameProvidedByUser) {
final Serde<K> repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde; final Serde<K> repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : keySerde;
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valueSerde; final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : valueSerde;
final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = final OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder =
@ -942,7 +948,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
repartitionValueSerde, repartitionValueSerde,
repartitionName, repartitionName,
null, null,
optimizableRepartitionNodeBuilder); optimizableRepartitionNodeBuilder,
isRepartitionTopicNameProvidedByUser);
if (repartitionNode == null || !name.equals(repartitionName)) { if (repartitionNode == null || !name.equals(repartitionName)) {
repartitionNode = optimizableRepartitionNodeBuilder.build(); repartitionNode = optimizableRepartitionNodeBuilder.build();
@ -965,11 +972,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final Serde<Value> valueSerde, final Serde<Value> valueSerde,
final String repartitionTopicNamePrefix, final String repartitionTopicNamePrefix,
final StreamPartitioner<Key, Value> streamPartitioner, final StreamPartitioner<Key, Value> streamPartitioner,
final BaseRepartitionNodeBuilder<Key, Value, RepartitionNode> baseRepartitionNodeBuilder final BaseRepartitionNodeBuilder<Key, Value, RepartitionNode> baseRepartitionNodeBuilder,
) { final boolean isRepartitionTopicNameProvidedByUser) {
final String repartitionTopicName = repartitionTopicNamePrefix.endsWith(REPARTITION_TOPIC_SUFFIX) ? final String repartitionTopicName = repartitionTopicNamePrefix.endsWith(REPARTITION_TOPIC_SUFFIX) ?
repartitionTopicNamePrefix : repartitionTopicNamePrefix :
repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX; repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
if (!isRepartitionTopicNameProvidedByUser) {
builder.internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withRepartitionTopic(repartitionTopicName).build());
}
// Always need to generate the names to burn index counter for compatibility // Always need to generate the names to burn index counter for compatibility
final String genSinkName = builder.newProcessorName(SINK_NAME); final String genSinkName = builder.newProcessorName(SINK_NAME);
@ -1051,7 +1062,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin( final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
name != null ? name : this.name, name != null ? name : this.name,
joinedInternal.keySerde(), joinedInternal.keySerde(),
joinedInternal.leftValueSerde() joinedInternal.leftValueSerde(),
name != null
); );
return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, false); return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, false);
} else { } else {
@ -1091,7 +1103,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin( final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(
name != null ? name : this.name, name != null ? name : this.name,
joinedInternal.keySerde(), joinedInternal.keySerde(),
joinedInternal.leftValueSerde() joinedInternal.leftValueSerde(),
name != null
); );
return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, true); return thisStreamRepartitioned.doStreamTableJoin(table, joiner, joinedInternal, true);
} else { } else {
@ -1124,6 +1137,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K
joinedInternal.gracePeriod(), joinedInternal.gracePeriod(),
name) name)
); );
if (joinedInternal.name() == null) {
final InternalResourcesNaming internalResourcesNaming = InternalResourcesNaming.builder().withStateStore(bufferName).withChangelogTopic(bufferName + "-changelog").build();
internalTopologyBuilder().addImplicitInternalNames(internalResourcesNaming);
}
} }
final ProcessorSupplier<K, V, K, VOut> processorSupplier = new KStreamKTableJoin<>( final ProcessorSupplier<K, V, K, VOut> processorSupplier = new KStreamKTableJoin<>(

View File

@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode; import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode; import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
@ -168,6 +169,14 @@ class KStreamImplJoin {
); );
} }
if (userProvidedBaseStoreName == null) {
addInternalResourceName(thisWindowStore);
addInternalResourceName(otherWindowStore);
if (outerJoinWindowStore.isPresent()) {
addInternalResourceName(outerJoinWindowStore.get());
}
}
// Time-shared between joins to keep track of the maximum stream time // Time-shared between joins to keep track of the maximum stream time
final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier(); final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier();
@ -261,4 +270,12 @@ class KStreamImplJoin {
valueSerde valueSerde
)); ));
} }
private void addInternalResourceName(final StoreFactory windowStore) {
final InternalResourcesNaming.Builder thisInternalResourcesNaming = InternalResourcesNaming.builder().withStateStore(windowStore.storeName());
if (windowStore.loggingEnabled()) {
thisInternalResourcesNaming.withChangelogTopic(windowStore.storeName() + "-changelog");
}
builder.internalTopologyBuilder().addImplicitInternalNames(thisInternalResourcesNaming.build());
}
} }

View File

@ -66,6 +66,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier; import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
@ -549,8 +550,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
final SuppressedInternal<K> suppressedInternal = buildSuppress(suppressed, name); final SuppressedInternal<K> suppressedInternal = buildSuppress(suppressed, name);
final String storeName = final String storeName;
suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); if (suppressedInternal.name() != null) {
storeName = suppressedInternal.name() + "-store";
} else {
storeName = builder.newStoreName(SUPPRESS_NAME);
if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withChangelogTopic(storeName + "-changelog").build());
}
}
final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> storeBuilder; final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> storeBuilder;

View File

@ -21,6 +21,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.state.DslStoreSuppliers; import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.StoreSupplier; import org.apache.kafka.streams.state.StoreSupplier;
@ -53,6 +54,13 @@ public final class MaterializedInternal<K, V, S extends StateStore> extends Mate
queryable = forceQueryable || storeName() != null; queryable = forceQueryable || storeName() != null;
if (storeName() == null && nameProvider != null) { if (storeName() == null && nameProvider != null) {
storeName = nameProvider.newStoreName(generatedStorePrefix); storeName = nameProvider.newStoreName(generatedStorePrefix);
if (nameProvider instanceof InternalStreamsBuilder) {
final InternalResourcesNaming.Builder internalResourcesNaming = InternalResourcesNaming.builder().withStateStore(storeName);
if (loggingEnabled()) {
internalResourcesNaming.withChangelogTopic(storeName + "-changelog");
}
((InternalStreamsBuilder) nameProvider).internalTopologyBuilder().addImplicitInternalNames(internalResourcesNaming.build());
}
} }
// if store type is not configured during creating Materialized, then try to get the topologyConfigs from nameProvider // if store type is not configured during creating Materialized, then try to get the topologyConfigs from nameProvider

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
public final class InternalResourcesNaming {
private final String repartitionTopic;
private final String changelogTopic;
private final String stateStore;
private InternalResourcesNaming(final Builder builder) {
this.repartitionTopic = builder.repartitionTopic;
this.changelogTopic = builder.changelogTopic;
this.stateStore = builder.stateStore;
}
public static Builder builder() {
return new Builder();
}
public static final class Builder {
private String repartitionTopic;
private String changelogTopic;
private String stateStore;
private Builder() {}
public Builder withRepartitionTopic(final String repartitionTopic) {
this.repartitionTopic = repartitionTopic;
return this;
}
public Builder withChangelogTopic(final String changelogTopic) {
this.changelogTopic = changelogTopic;
return this;
}
public Builder withStateStore(final String stateStore) {
this.stateStore = stateStore;
return this;
}
public InternalResourcesNaming build() {
return new InternalResourcesNaming(this);
}
}
public String repartitionTopic() {
return repartitionTopic;
}
public String changelogTopic() {
return changelogTopic;
}
public String stateStore() {
return stateStore;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
@ -53,6 +54,7 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -64,12 +66,14 @@ import java.util.TreeSet;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
public class InternalTopologyBuilder { public class InternalTopologyBuilder {
public InternalTopologyBuilder() { public InternalTopologyBuilder() {
this.topologyName = null; this.topologyName = null;
this.ensureExplicitInternalResourceNaming = false;
this.processorWrapper = new NoOpProcessorWrapper(); this.processorWrapper = new NoOpProcessorWrapper();
} }
@ -78,7 +82,7 @@ public class InternalTopologyBuilder {
this.topologyConfigs = topologyConfigs; this.topologyConfigs = topologyConfigs;
this.topologyName = topologyConfigs.topologyName; this.topologyName = topologyConfigs.topologyName;
this.ensureExplicitInternalResourceNaming = topologyConfigs.ensureExplicitInternalResourceNaming;
try { try {
processorWrapper = topologyConfigs.getConfiguredInstance( processorWrapper = topologyConfigs.getConfiguredInstance(
PROCESSOR_WRAPPER_CLASS_CONFIG, PROCESSOR_WRAPPER_CLASS_CONFIG,
@ -194,6 +198,10 @@ public class InternalTopologyBuilder {
private boolean hasPersistentStores = false; private boolean hasPersistentStores = false;
private final boolean ensureExplicitInternalResourceNaming;
private final Set<InternalResourcesNaming> implicitInternalNames = new LinkedHashSet<>();
public static class ReprocessFactory<KIn, VIn, KOut, VOut> { public static class ReprocessFactory<KIn, VIn, KOut, VOut> {
private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier; private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
@ -2293,4 +2301,46 @@ public class InternalTopologyBuilder {
processorWrapper.wrapProcessorSupplier(name, processorSupplier) processorWrapper.wrapProcessorSupplier(name, processorSupplier)
); );
} }
public void addImplicitInternalNames(final InternalResourcesNaming internalResourcesNaming) {
implicitInternalNames.add(internalResourcesNaming);
}
public void checkUnprovidedNames() {
if (!implicitInternalNames.isEmpty()) {
final StringBuilder result = new StringBuilder();
final List<String> changelogTopics = new ArrayList<>();
final List<String> stateStores = new ArrayList<>();
final List<String> repartitionTopics = new ArrayList<>();
for (final InternalResourcesNaming internalResourcesNaming : implicitInternalNames) {
if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) {
changelogTopics.add(internalResourcesNaming.changelogTopic());
}
if (!Utils.isBlank(internalResourcesNaming.stateStore())) {
stateStores.add(internalResourcesNaming.stateStore());
}
if (!Utils.isBlank(internalResourcesNaming.repartitionTopic())) {
repartitionTopics.add(internalResourcesNaming.repartitionTopic());
}
}
if (!changelogTopics.isEmpty()) {
result.append(String.format("Following changelog topic(s) has not been named: %s%n", String.join(", ", changelogTopics)));
}
if (!stateStores.isEmpty()) {
result.append(String.format("Following state store(s) has not been named: %s%n", String.join(", ", stateStores)));
}
if (!repartitionTopics.isEmpty()) {
result.append(String.format("Following repartition topic(s) has not been named: %s%n", String.join(", ", repartitionTopics)));
}
if (ensureExplicitInternalResourceNaming) {
throw new TopologyException(result.toString());
} else {
log.warn("Explicit naming for internal resources is currently disabled. If you want to enforce" +
" user-defined names for all internal resources, set " + ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG +
" to true. Note: Changing internal resource names may require a full streams application reset for an" +
" already deployed application. Consult the documentation on naming operators for more details. {}", result);
}
}
}
} }

View File

@ -22,9 +22,11 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Topology.AutoOffsetReset; import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Branched; import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.Joined;
@ -35,12 +37,14 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Printed; import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined; import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorContext;
@ -54,6 +58,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore; import org.apache.kafka.streams.state.internals.InMemorySessionStore;
@ -90,6 +95,7 @@ import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
@ -103,6 +109,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is; import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -2354,6 +2361,552 @@ public class StreamsBuilderTest {
assertThrows(TopologyException.class, builder::build); assertThrows(TopologyException.class, builder::build);
} }
@Test
void shouldThrowWhenGroupByAggregationWithRepartitionNameAndLoggingEnabled() {
final StreamsBuilder builder = buildWithGroupByAggregationTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.with(Serdes.String(), Serdes.Long())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldThrowWhenGroupByAggregationWithRepartitionNameAndLoggingDisabled() {
final StreamsBuilder builder = buildWithGroupByAggregationTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), Serdes.Long())
.withLoggingDisabled()
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldNotThrowWhenGroupByAggregationWithMaterializedName() {
final StreamsBuilder builder = buildWithGroupByAggregationTopology(
Grouped.with(Serdes.String(), Serdes.String()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldNotThrowWhenGroupByAggregationWithRepartitionNameAndMaterialized() {
final StreamsBuilder builder = buildWithGroupByAggregationTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenGroupByAggregationWithoutRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildWithGroupByAggregationTopology(
Grouped.with(Serdes.String(), Serdes.String()),
Materialized.with(Serdes.String(), Serdes.Long())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
}
private StreamsBuilder buildWithGroupByAggregationTopology(final Grouped<String, String> grouped,
final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> stream = builder.stream("input1");
stream
.groupBy((k, v) -> v, grouped)
.count(materialized)
.toStream()
.to("output", Produced.as("sink"));
return builder;
}
@Test
void shouldThrowWhenGroupByKeyAggregationWithRepartitionNameAndLoggingEnabled() {
final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.with(Serdes.String(), Serdes.Long())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldThrowWhenGroupByKeyAggregationWithRepartitionNameAndLoggingDisabled() {
final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), Serdes.Long())
.withLoggingDisabled()
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldNotThrowWhenGroupByKeyAggregationWithMaterializedName() {
final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
Grouped.with(Serdes.String(), Serdes.String()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldNotThrowWhenGroupByKeyAggregationWithRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenGroupByKeyAggregationWithoutRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
Grouped.with(Serdes.String(), Serdes.String()),
Materialized.with(Serdes.String(), Serdes.Long())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
}
private StreamsBuilder buildWithGroupByKeyAggregationTopology(final Grouped<String, String> grouped,
final Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> stream = builder.stream("input1");
stream
.selectKey((k, v) -> v)
.groupByKey(grouped)
.count(materialized)
.toStream()
.to("output", Produced.as("sink"));
return builder;
}
@Test
void shouldNotThrowWhenSuppressWithSuppressName() {
final StreamsBuilder builder = buildAggregationWithSuppressTopology(true, true);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenSuppressWithoutSuppressName() {
final StreamsBuilder builder = buildAggregationWithSuppressTopology(false, true);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KTABLE-SUPPRESS-STATE-STORE-0000000003-changelog"));
assertFalse(e.getMessage().contains("Following state store(s) has not been named"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldThrowWhenSuppressWithoutSuppressNameAndLoggingDisabled() {
final StreamsBuilder builder = buildAggregationWithSuppressTopology(false, false);
assertBuildDoesNotThrow(builder);
}
private StreamsBuilder buildAggregationWithSuppressTopology(final boolean isSuppressNamed,
final boolean isLoggingEnabled) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> stream = builder.stream("input1");
final KTable<Windowed<String>, Long> table = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count(Materialized.as("materialized-name"));
if (isSuppressNamed) {
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName("suppressed-name"))
.toStream()
.to("output", Produced.as("sink"));
} else {
if (isLoggingEnabled) {
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("output", Produced.as("sink"));
} else {
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()))
.toStream()
.to("output", Produced.as("sink"));
}
}
return builder;
}
@Test
void shouldThrowWhenKStreamKStreamJoinWithRepartitionNameAndLoggingEnabled() {
final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
.withName("repartition-name")
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-JOINTHIS-0000000012-store-changelog, KSTREAM-OUTEROTHER-0000000013-store-changelog, KSTREAM-OUTERSHARED-0000000012-store-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-JOINTHIS-0000000012-store, KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldThrowWhenKStreamKStreamJoinWithRepartitionNameAndLoggingDisabled() {
final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
.withName("repartition-name").withLoggingDisabled()
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-JOINTHIS-0000000012-store, KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldThrowWhenKStreamKStreamJoinWithMaterializedName() {
final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
.withStoreName("store-name")
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertFalse(e.getMessage().contains("Following changelog topic(s) has not been named"));
assertFalse(e.getMessage().contains("Following state store(s) has not been named"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000002-repartition, KSTREAM-KEY-SELECT-0000000003-repartition"));
}
@Test
void shouldNotThrowWhenKStreamKStreamJoinWithRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
.withName("repartition-name")
.withStoreName("store-name")
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenKStreamKStreamJoinWithoutRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-JOINTHIS-0000000012-store-changelog, KSTREAM-OUTEROTHER-0000000013-store-changelog, KSTREAM-OUTERSHARED-0000000012-store-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: KSTREAM-JOINTHIS-0000000012-store, KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000002-repartition, KSTREAM-KEY-SELECT-0000000003-repartition"));
}
private StreamsBuilder buildKStreamKStreamJoinTopology(final StreamJoined<String, String, String> streamJoined) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
streamOne
.selectKey((k, v) -> v)
.leftJoin(
streamTwo.selectKey((k, v) -> v),
(value1, value2) -> value1,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
streamJoined
);
return builder;
}
@Test
void shouldThrowWhenKStreamKTableJoinWithRepartitionNameAndLoggingEnabled() {
final StreamsBuilder builder = buildKStreamKTableJoinTopology(
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("repartition-name"),
Materialized.with(Serdes.String(), Serdes.String())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: stream-topic-two-STATE-STORE-0000000001-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: stream-topic-two-STATE-STORE-0000000001"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldThrowWhenKStreamKTableJoinWithRepartitionNameAndLoggingDisabled() {
final StreamsBuilder builder = buildKStreamKTableJoinTopology(
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("repartition-name"),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), Serdes.String())
.withLoggingDisabled()
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following state store(s) has not been named: stream-topic-two-STATE-STORE-0000000001"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldThrowWhenKStreamKTableJoinWithMaterializedName() {
final StreamsBuilder builder = buildKStreamKTableJoinTopology(
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertFalse(e.getMessage().contains("Following changelog topic(s) has not been named"));
assertFalse(e.getMessage().contains("Following state store(s) has not been named"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000003-repartition"));
}
@Test
void shouldNotThrowWhenKStreamKTableJoinWithRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildKStreamKTableJoinTopology(
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("repartition-name"),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenKStreamKTableJoinWithoutRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildKStreamKTableJoinTopology(
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()),
Materialized.with(Serdes.String(), Serdes.String())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: stream-topic-two-STATE-STORE-0000000001-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: stream-topic-two-STATE-STORE-0000000001"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000004-repartition"));
}
private StreamsBuilder buildKStreamKTableJoinTopology(final Joined<String, String, String> joined,
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
final KTable<String, String> table = builder.table(STREAM_TOPIC_TWO, materialized);
stream
.selectKey((k, v) -> v)
.join(
table,
(value1, value2) -> value1,
joined
);
return builder;
}
@Test
void shouldNotThrowWhenKStreamVersionedKTableJoinWithRepartitionName() {
final StreamsBuilder builder = buildKStreamVersionedKTableJoinTopology(
Joined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("repartition-name")
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenKStreamVersionedKTableJoinWithoutRepartitionName() {
final StreamsBuilder builder = buildKStreamVersionedKTableJoinTopology(
Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-JOIN-0000000007-Buffer-changelog"));
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: KSTREAM-JOIN-0000000007-Buffer"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-KEY-SELECT-0000000003-repartition"));
}
private StreamsBuilder buildKStreamVersionedKTableJoinTopology(final Joined<String, String, String> joined) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final VersionedBytesStoreSupplier versionedStoreSupplier =
Stores.persistentVersionedKeyValueStore("versioned-ktable-store",
Duration.ofDays(1));
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.<String, String>as(versionedStoreSupplier)
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String());
final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
final KTable<String, String> table = builder.table(STREAM_TOPIC_TWO, materialized);
stream
.selectKey((k, v) -> v)
.join(
table,
(value1, value2) -> value1,
joined.withGracePeriod(Duration.ofHours(1))
)
.to("test-topic");
return builder;
}
@Test
void shouldNotThrowWhenKStreamGlobalKTableJoinWithMaterializedName() {
final StreamsBuilder builder = buildKStreamGlobalKTableJoinTopology(
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenKStreamGlobalKTableJoinWithoutStoreName() {
final StreamsBuilder builder = buildKStreamGlobalKTableJoinTopology(null);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertFalse(e.getMessage().contains("Following changelog topic(s) has not been named"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: stream-topic-two-STATE-STORE-0000000001"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
private StreamsBuilder buildKStreamGlobalKTableJoinTopology(final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
final GlobalKTable<String, String> globalTable;
if (materialized != null) {
globalTable = builder.globalTable(STREAM_TOPIC_TWO, materialized);
} else {
globalTable = builder.globalTable(STREAM_TOPIC_TWO);
}
stream
.selectKey((k, v) -> v)
.join(
globalTable,
(k, v) -> k,
(value1, value2) -> value1
);
return builder;
}
@Test
void shouldNotThrowWhenRepartitionWithRepartitionName() {
final StreamsBuilder builder = buildRepartitionTopology(
Repartitioned.with(Serdes.String(), Serdes.String())
.withName("repartition-name")
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenRepartitionWithoutRepartition() {
final StreamsBuilder builder = buildRepartitionTopology(
Repartitioned.with(Serdes.String(), Serdes.String())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertFalse(e.getMessage().contains("Following changelog topic(s) has not been named"));
assertFalse(e.getMessage().contains("Following state store(s) has not been named"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: KSTREAM-REPARTITION-0000000001-repartition"));
}
private StreamsBuilder buildRepartitionTopology(final Repartitioned<String, String> repartitioned) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> stream = builder.stream("input1");
stream
.repartition(repartitioned)
.to("output", Produced.as("sink"));
return builder;
}
@Test
void shouldThrowWhenCoGroupWithRepartitionNameAndLoggingEnabled() {
final StreamsBuilder builder = buildCoGroupTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.with(Serdes.String(), Serdes.String())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldThrowWhenCoGroupWithRepartitionNameAndLoggingDisabled() {
final StreamsBuilder builder = buildCoGroupTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(Serdes.String(), Serdes.String())
.withLoggingDisabled()
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following state store(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertFalse(e.getMessage().contains("Following repartition topic(s) has not been named"));
}
@Test
void shouldNotThrowWhenCoGroupWithMaterializedName() {
final StreamsBuilder builder = buildCoGroupTopology(
Grouped.with(Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldNotThrowWhenCoGroupWithRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildCoGroupTopology(
Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-name")
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
);
assertBuildDoesNotThrow(builder);
}
@Test
void shouldThrowWhenCoGroupWithoutRepartitionNameAndMaterializedName() {
final StreamsBuilder builder = buildCoGroupTopology(
Grouped.with(Serdes.String(), Serdes.String()),
Materialized.with(Serdes.String(), Serdes.String())
);
final TopologyException e = assertThrows(TopologyException.class, builder::build);
assertTrue(e.getMessage().contains("Following changelog topic(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
assertTrue(e.getMessage().contains("Following state store(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
}
private StreamsBuilder buildCoGroupTopology(final Grouped<String, String> grouped,
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
final Map<Object, Object> props = dummyStreamsConfigMap();
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
final KStream<String, String> streamTwo = builder.stream(STREAM_TOPIC_TWO);
final KGroupedStream<String, String> groupedOne = streamOne.groupBy((k, v) -> v, grouped);
final KGroupedStream<String, String> groupedTwo = streamTwo.groupByKey();
final Aggregator<String, String, String> agg1 = (key, value, aggregate) -> aggregate + value;
final Aggregator<String, String, String> agg2 = (key, value, aggregate) -> aggregate + value;
final KTable<String, String> coGroupedStream = groupedOne
.cogroup(agg1)
.cogroup(groupedTwo, agg2)
.aggregate(() -> "", materialized);
coGroupedStream.toStream().to("output");
return builder;
}
private static void assertBuildDoesNotThrow(final StreamsBuilder builder) { private static void assertBuildDoesNotThrow(final StreamsBuilder builder) {
try { try {
builder.build(); builder.build();

View File

@ -69,6 +69,7 @@ import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ENABLE_METRICS_PUSH_CONFIG; import static org.apache.kafka.streams.StreamsConfig.ENABLE_METRICS_PUSH_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
@ -129,6 +130,7 @@ public class StreamsConfigTest {
case "DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC": case "DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC":
case "DSL_STORE_SUPPLIERS_CLASS_DOC": case "DSL_STORE_SUPPLIERS_CLASS_DOC":
case "PROCESSOR_WRAPPER_CLASS_DOC": case "PROCESSOR_WRAPPER_CLASS_DOC":
case "ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC":
continue; continue;
// check for leaking, but already deprecated members // check for leaking, but already deprecated members
@ -1582,6 +1584,18 @@ public class StreamsConfigTest {
assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass()); assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, streamsConfig.productionExceptionHandler().getClass());
} }
@Test
public void shouldGetDefaultEnsureExplicitInternalResourceNaming() {
assertFalse(streamsConfig.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG));
}
@Test
public void shouldEnsureExplicitInternalResourceNaming() {
props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
streamsConfig = new StreamsConfig(props);
assertTrue(streamsConfig.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG));
}
static class MisconfiguredSerde implements Serde<Object> { static class MisconfiguredSerde implements Serde<Object> {
@Override @Override
public void configure(final Map<String, ?> configs, final boolean isKey) { public void configure(final Map<String, ?> configs, final boolean isKey) {