mirror of https://github.com/apache/kafka.git
MINOR: Keep Kafka Streams configs ordered in code and docs (#16816)
Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
f5439864c6
commit
f69b465414
|
@ -75,6 +75,7 @@ settings.put(... , ...);</code></pre>
|
|||
<li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
|
||||
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
|
||||
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
|
||||
<li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
|
||||
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
|
||||
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
|
||||
<li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
|
||||
|
@ -92,7 +93,6 @@ settings.put(... , ...);</code></pre>
|
|||
<li><a class="reference internal" href="#task-assignor-class" id="id39">task.assignor.class</a></li>
|
||||
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
|
||||
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
|
||||
<li><a class="reference internal" href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
|
||||
</ul>
|
||||
</li>
|
||||
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
|
||||
|
@ -279,7 +279,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
|
|||
<tr class="row-even"><td>commit.interval.ms</td>
|
||||
<td>Low</td>
|
||||
<td colspan="2">The frequency in milliseconds with which to save the position (offsets in source topics) of tasks.</td>
|
||||
<td>30000 milliseconds</td>
|
||||
<td>30000 milliseconds (30 seconds)</td>
|
||||
</tr>
|
||||
<tr class="row-odd"><td>default.deserialization.exception.handler</td>
|
||||
<td>Medium</td>
|
||||
|
@ -325,6 +325,11 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
|
|||
</td>
|
||||
<td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
|
||||
</tr>
|
||||
<tr class="row-even"><td>log.summary.interval.ms</td>
|
||||
<td>Low</td>
|
||||
<td colspan="2">The output interval in milliseconds for logging summary information (disabled if negative).</td>
|
||||
<td>120000 milliseconds (2 minutes)</td>
|
||||
</tr>
|
||||
<tr class="row-odd"><td>max.task.idle.ms</td>
|
||||
<td>Medium</td>
|
||||
<td colspan="2">
|
||||
|
@ -465,11 +470,6 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
|
|||
<td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td>
|
||||
<td><code class="docutils literal"><span class="pre">null</span></code></td>
|
||||
</tr>
|
||||
<tr class="row-odd"><td>log.summary.interval.ms</td>
|
||||
<td>Low</td>
|
||||
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
|
||||
<td>120000milliseconds (2 minutes)</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
<div class="section" id="acceptable-recovery-lag">
|
||||
|
@ -781,6 +781,16 @@ rack.aware.assignment.tags: zone,cluster | rack.aware.assignment.tags: zone,cl
|
|||
</div>
|
||||
</blockquote>
|
||||
</div>
|
||||
<div class="section" id="log-summary-interval-ms">
|
||||
<h4><a class="toc-backref" href="#id40">log.summary.interval.ms</a><a class="headerlink" href="#log-summary-interval-ms" title="Permalink to this headline"></a></h4>
|
||||
<blockquote>
|
||||
<div>
|
||||
This configuration controls the output interval for summary information.
|
||||
If greater or equal to 0, the summary log will be output according to the set time interval;
|
||||
If less than 0, summary output is disabled.
|
||||
</div>
|
||||
</blockquote>
|
||||
</div>
|
||||
<div class="section" id="max-task-idle-ms">
|
||||
<span id="streams-developer-guide-max-task-idle-ms"></span><h4><a class="toc-backref" href="#id28">max.task.idle.ms</a><a class="headerlink" href="#max-task-idle-ms" title="Permalink to this headline"></a></h4>
|
||||
<blockquote>
|
||||
|
@ -1092,16 +1102,6 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
|
|||
</p>
|
||||
</div></blockquote>
|
||||
</div>
|
||||
<div class="section" id="log-summary-interval-ms">
|
||||
<h4><a class="toc-backref" href="#id40">log.summary.interval.ms</a><a class="headerlink" href="#log-summary-interval-ms" title="Permalink to this headline"></a></h4>
|
||||
<blockquote>
|
||||
<div>
|
||||
This configuration controls the output interval for summary information.
|
||||
If greater or equal to 0, the summary log will be output according to the set time interval;
|
||||
If less than 0, summary output is disabled.
|
||||
</div>
|
||||
</blockquote>
|
||||
</div>
|
||||
<div class="section" id="upgrade-from">
|
||||
<span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
|
||||
<blockquote>
|
||||
|
|
|
@ -243,18 +243,6 @@ public class StreamsConfig extends AbstractConfig {
|
|||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String CLIENT_TAG_PREFIX = "client.tag.";
|
||||
|
||||
/** {@code topology.optimization} */
|
||||
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
|
||||
private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
|
||||
+ " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
|
||||
+ "or a comma separated list of specific optimizations: "
|
||||
+ "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\" + "
|
||||
+ "\"SINGLE_STORE_SELF_JOIN+\").";
|
||||
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka "
|
||||
+ "Streams if it should optimize the topology and what optimizations to apply. "
|
||||
+ CONFIG_ERROR_MSG
|
||||
+ "\"NO_OPTIMIZATION\" by default.";
|
||||
|
||||
/**
|
||||
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization
|
||||
*/
|
||||
|
@ -474,6 +462,9 @@ public class StreamsConfig extends AbstractConfig {
|
|||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String EXACTLY_ONCE_V2 = "exactly_once_v2";
|
||||
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "none";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "min_traffic";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY = "balance_subtopology";
|
||||
/**
|
||||
* Config value for parameter {@link #BUILT_IN_METRICS_VERSION_CONFIG "built.in.metrics.version"} for the latest built-in metrics version.
|
||||
*/
|
||||
|
@ -516,23 +507,12 @@ public class StreamsConfig extends AbstractConfig {
|
|||
public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
|
||||
public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
|
||||
|
||||
/** {@code statestore.cache.max.bytes} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes";
|
||||
public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads";
|
||||
|
||||
/** {@code client.id} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
|
||||
private static final String CLIENT_ID_DOC = "An ID prefix string used for the client IDs of internal (main, restore, and global) consumers , producers, and admin clients" +
|
||||
" with pattern <code><client.id>-[Global]StreamThread[-<threadSequenceNumber>]-<consumer|producer|restore-consumer|global-consumer></code>.";
|
||||
|
||||
/** {@code enable.metrics.push} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String ENABLE_METRICS_PUSH_CONFIG = CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG;
|
||||
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." +
|
||||
" The cluster must have a client metrics subscription which corresponds to a client.";
|
||||
|
||||
/** {@code commit.interval.ms} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
|
||||
|
@ -542,17 +522,15 @@ public class StreamsConfig extends AbstractConfig {
|
|||
" (Note, if <code>processing.guarantee</code> is set to <code>" + EXACTLY_ONCE_V2 + "</code>, <code>" + EXACTLY_ONCE + "</code>,the default value is <code>" + EOS_DEFAULT_COMMIT_INTERVAL_MS + "</code>," +
|
||||
" otherwise the default value is <code>" + DEFAULT_COMMIT_INTERVAL_MS + "</code>.";
|
||||
|
||||
/** {@code repartition.purge.interval.ms} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG = "repartition.purge.interval.ms";
|
||||
private static final String REPARTITION_PURGE_INTERVAL_MS_DOC = "The frequency in milliseconds with which to delete fully consumed records from repartition topics." +
|
||||
" Purging will occur after at least this value since the last purge, but may be delayed until later." +
|
||||
" (Note, unlike <code>commit.interval.ms</code>, the default for this value remains unchanged when <code>processing.guarantee</code> is set to <code>" + EXACTLY_ONCE_V2 + "</code>).";
|
||||
|
||||
/** {@code connections.max.idle.ms} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
|
||||
|
||||
/** {@code default.client.supplier} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";
|
||||
public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
|
||||
|
||||
/** {@code default.deserialization.exception.handler} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler";
|
||||
|
@ -563,11 +541,6 @@ public class StreamsConfig extends AbstractConfig {
|
|||
public static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.production.exception.handler";
|
||||
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code> interface.";
|
||||
|
||||
/** {@code processing.exception.handler} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler";
|
||||
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.";
|
||||
|
||||
/** {@code default.dsl.store} */
|
||||
@Deprecated
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
|
@ -587,17 +560,12 @@ public class StreamsConfig extends AbstractConfig {
|
|||
static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
|
||||
static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
|
||||
|
||||
public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde";
|
||||
private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " +
|
||||
"<code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result " +
|
||||
"in an error as it is meant to be used only from Plain consumer client.";
|
||||
|
||||
/** {@code default key.serde} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
|
||||
private static final String DEFAULT_KEY_SERDE_CLASS_DOC = "Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. ";
|
||||
|
||||
/** {@code default value.serde} */
|
||||
/** {@code default.value.serde} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
|
||||
private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. ";
|
||||
|
@ -607,6 +575,18 @@ public class StreamsConfig extends AbstractConfig {
|
|||
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
|
||||
public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code> interface.";
|
||||
|
||||
/** {@code enable.metrics.push} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String ENABLE_METRICS_PUSH_CONFIG = CommonClientConfigs.ENABLE_METRICS_PUSH_CONFIG;
|
||||
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable pushing of internal client metrics for (main, restore, and global) consumers, producers, and admin clients." +
|
||||
" The cluster must have a client metrics subscription which corresponds to a client.";
|
||||
|
||||
/** {@code log.summary.interval.ms} */
|
||||
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
|
||||
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output interval in milliseconds for logging summary information.\n" +
|
||||
"If greater or equal to 0, the summary log will be output according to the set time interval;\n" +
|
||||
"If less than 0, summary output is disabled.";
|
||||
|
||||
/** {@code max.task.idle.ms} */
|
||||
public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
|
||||
public static final String MAX_TASK_IDLE_MS_DOC = "This config controls whether joins and merges"
|
||||
|
@ -677,6 +657,11 @@ public class StreamsConfig extends AbstractConfig {
|
|||
private static final String PROBING_REBALANCE_INTERVAL_MS_DOC = "The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have finished warming up and are ready to become active." +
|
||||
" Probing rebalances will continue to be triggered until the assignment is balanced. Must be at least 1 minute.";
|
||||
|
||||
/** {@code processing.exception.handler} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler";
|
||||
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.";
|
||||
|
||||
/** {@code processing.guarantee} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String PROCESSING_GUARANTEE_CONFIG = "processing.guarantee";
|
||||
|
@ -689,10 +674,31 @@ public class StreamsConfig extends AbstractConfig {
|
|||
"recommended setting for production; for development you can change this, by adjusting broker setting " +
|
||||
"<code>transaction.state.log.replication.factor</code> and <code>transaction.state.log.min.isr</code>.";
|
||||
|
||||
/** {@code repartition.purge.interval.ms} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String REPARTITION_PURGE_INTERVAL_MS_CONFIG = "repartition.purge.interval.ms";
|
||||
private static final String REPARTITION_PURGE_INTERVAL_MS_DOC = "The frequency in milliseconds with which to delete fully consumed records from repartition topics." +
|
||||
" Purging will occur after at least this value since the last purge, but may be delayed until later." +
|
||||
" (Note, unlike <code>commit.interval.ms</code>, the default for this value remains unchanged when <code>processing.guarantee</code> is set to <code>" + EXACTLY_ONCE_V2 + "</code>).";
|
||||
|
||||
/** {@code receive.buffer.bytes} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
|
||||
|
||||
/** {@code rack.aware.assignment.non_overlap_cost} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = "rack.aware.assignment.non_overlap_cost";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = "Cost associated with moving tasks from existing assignment. This config and <code>rack.aware.assignment.traffic_cost</code> controls whether the "
|
||||
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
|
||||
+ "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.";
|
||||
|
||||
/** {@code rack.aware.assignment.strategy} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take <code>client.rack</code> and <code>racks</code> of <code>TopicPartition</code> into account when assigning"
|
||||
+ " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code> (default), which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
|
||||
+ "</code>, which will compute minimum cross rack traffic assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY + "</code>, which will compute minimum cross rack traffic and try to balance the tasks of same subtopolgies across different clients";
|
||||
|
||||
/** {@code rack.aware.assignment.tags} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RACK_AWARE_ASSIGNMENT_TAGS_CONFIG = "rack.aware.assignment.tags";
|
||||
|
@ -700,6 +706,13 @@ public class StreamsConfig extends AbstractConfig {
|
|||
" When configured, Kafka Streams will make a best-effort to distribute" +
|
||||
" the standby tasks over each client tag dimension.";
|
||||
|
||||
/** {@code rack.aware.assignment.traffic_cost} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and <code>rack.aware.assignment.non_overlap_cost</code> controls whether the "
|
||||
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
|
||||
+ "optimize for minimizing cross rack traffic. The default value is null which means it will use default traffic cost values in different assignors.";
|
||||
|
||||
/** {@code reconnect.backoff.ms} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
|
||||
|
@ -745,17 +758,23 @@ public class StreamsConfig extends AbstractConfig {
|
|||
public static final String STATE_DIR_CONFIG = "state.dir";
|
||||
private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem. Note that if not configured, then the default location will be different in each environment as it is computed using System.getProperty(\"java.io.tmpdir\")";
|
||||
|
||||
/** {@code statestore.cache.max.bytes} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String STATESTORE_CACHE_MAX_BYTES_CONFIG = "statestore.cache.max.bytes";
|
||||
public static final String STATESTORE_CACHE_MAX_BYTES_DOC = "Maximum number of memory bytes to be used for statestore cache across all threads";
|
||||
|
||||
/** {@code task.assignor.class} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
|
||||
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <code>" +
|
||||
TaskAssignor.class.getName() + "</code> interface. Defaults to the <code>HighAvailabilityTaskAssignor</code> class.";
|
||||
|
||||
/** {@code task.timeout.ms} */
|
||||
public static final String TASK_TIMEOUT_MS_CONFIG = "task.timeout.ms";
|
||||
public static final String TASK_TIMEOUT_MS_DOC = "The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. " +
|
||||
"For a timeout of 0ms, a task would raise an error for the first internal error. " +
|
||||
"For any timeout larger than 0ms, a task will retry at least once before an error is raised.";
|
||||
|
||||
|
||||
/** {@code window.size.ms} */
|
||||
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
|
||||
private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the deserializer in order to calculate window end times.";
|
||||
|
||||
/** {@code upgrade.from} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
|
||||
|
@ -772,48 +791,33 @@ public class StreamsConfig extends AbstractConfig {
|
|||
UPGRADE_FROM_35 + "\", \"" + UPGRADE_FROM_36 + "\", \"" + UPGRADE_FROM_37 + "\", \"" +
|
||||
UPGRADE_FROM_38 + "(for upgrading from the corresponding old version).";
|
||||
|
||||
/** {@code topology.optimization} */
|
||||
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
|
||||
private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
|
||||
+ " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
|
||||
+ "or a comma separated list of specific optimizations: "
|
||||
+ "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\" + "
|
||||
+ "\"SINGLE_STORE_SELF_JOIN+\").";
|
||||
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka "
|
||||
+ "Streams if it should optimize the topology and what optimizations to apply. "
|
||||
+ CONFIG_ERROR_MSG
|
||||
+ "\"NO_OPTIMIZATION\" by default.";
|
||||
|
||||
/** {@code windowed.inner.class.serde} */
|
||||
public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde";
|
||||
private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " +
|
||||
"<code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result " +
|
||||
"in an error as it is meant to be used only from Plain consumer client.";
|
||||
|
||||
/** {@code window.size.ms} */
|
||||
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
|
||||
private static final String WINDOW_SIZE_MS_DOC = "Sets window size for the deserializer in order to calculate window end times.";
|
||||
|
||||
/** {@code windowstore.changelog.additional.retention.ms} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
|
||||
private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";
|
||||
|
||||
/** {@code default.client.supplier} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";
|
||||
public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";
|
||||
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "none";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "min_traffic";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY = "balance_subtopology";
|
||||
|
||||
/** {@code } rack.aware.assignment.strategy */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take <code>client.rack</code> and <code>racks</code> of <code>TopicPartition</code> into account when assigning"
|
||||
+ " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code> (default), which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
|
||||
+ "</code>, which will compute minimum cross rack traffic assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY + "</code>, which will compute minimum cross rack traffic and try to balance the tasks of same subtopolgies across different clients";
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and <code>rack.aware.assignment.non_overlap_cost</code> controls whether the "
|
||||
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
|
||||
+ "optimize for minimizing cross rack traffic. The default value is null which means it will use default traffic cost values in different assignors.";
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = "rack.aware.assignment.non_overlap_cost";
|
||||
public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = "Cost associated with moving tasks from existing assignment. This config and <code>" + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG + "</code> controls whether the "
|
||||
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
|
||||
+ "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.";
|
||||
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String TASK_ASSIGNOR_CLASS_CONFIG = "task.assignor.class";
|
||||
private static final String TASK_ASSIGNOR_CLASS_DOC = "A task assignor class or class name implementing the <code>" +
|
||||
TaskAssignor.class.getName() + "</code> interface. Defaults to the <code>HighAvailabilityTaskAssignor</code> class.";
|
||||
|
||||
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
|
||||
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "This configuration controls the output interval for summary information.\n" +
|
||||
"If greater or equal to 0, the summary log will be output according to the set time interval;\n" +
|
||||
"If less than 0, summary output is disabled.";
|
||||
/**
|
||||
* {@code topology.optimization}
|
||||
* @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
|
||||
|
@ -1067,6 +1071,11 @@ public class StreamsConfig extends AbstractConfig {
|
|||
DefaultKafkaClientSupplier.class.getName(),
|
||||
Importance.LOW,
|
||||
DEFAULT_CLIENT_SUPPLIER_DOC)
|
||||
.define(LOG_SUMMARY_INTERVAL_MS_CONFIG,
|
||||
Type.LONG,
|
||||
2 * 60 * 1000L,
|
||||
Importance.LOW,
|
||||
LOG_SUMMARY_INTERVAL_MS_DOC)
|
||||
.define(METADATA_MAX_AGE_CONFIG,
|
||||
Type.LONG,
|
||||
5 * 60 * 1000L,
|
||||
|
@ -1162,17 +1171,16 @@ public class StreamsConfig extends AbstractConfig {
|
|||
Type.STRING,
|
||||
null,
|
||||
in(Stream.concat(
|
||||
Stream.of((String) null),
|
||||
Arrays.stream(UpgradeFromValues.values()).map(UpgradeFromValues::toString)
|
||||
).toArray(String[]::new)
|
||||
),
|
||||
Stream.of((String) null),
|
||||
Arrays.stream(UpgradeFromValues.values()).map(UpgradeFromValues::toString)
|
||||
).toArray(String[]::new)),
|
||||
Importance.LOW,
|
||||
UPGRADE_FROM_DOC)
|
||||
.define(WINDOWED_INNER_CLASS_SERDE,
|
||||
Type.STRING,
|
||||
null,
|
||||
Importance.LOW,
|
||||
WINDOWED_INNER_CLASS_SERDE_DOC)
|
||||
Type.STRING,
|
||||
null,
|
||||
Importance.LOW,
|
||||
WINDOWED_INNER_CLASS_SERDE_DOC)
|
||||
.define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
|
||||
Type.LONG,
|
||||
24 * 60 * 60 * 1000L,
|
||||
|
@ -1182,12 +1190,7 @@ public class StreamsConfig extends AbstractConfig {
|
|||
Type.LONG,
|
||||
null,
|
||||
Importance.LOW,
|
||||
WINDOW_SIZE_MS_DOC)
|
||||
.define(LOG_SUMMARY_INTERVAL_MS_CONFIG,
|
||||
Type.LONG,
|
||||
2 * 60 * 1000L,
|
||||
Importance.LOW,
|
||||
LOG_SUMMARY_INTERVAL_MS_DOC);
|
||||
WINDOW_SIZE_MS_DOC);
|
||||
}
|
||||
|
||||
// this is the list of configs for underlying clients
|
||||
|
|
Loading…
Reference in New Issue