mirror of https://github.com/apache/kafka.git
KAFKA-16328 Remove Deprecated config from StreamsConfig (#16805)
- StreamsConfig#RETRIES_CONFIG was deprecated in AK 2.7 and is no longer in use. - StreamsConfig#DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS and - StreamsConfig#DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS were deprecated in AK 3.0. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
0a4a12fbc4
commit
6f18f6f335
|
@ -75,8 +75,6 @@ settings.put(... , ...);</code></pre>
|
|||
<li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
|
||||
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
|
||||
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
|
||||
<li><a class="reference internal" href="#default-windowed-key-serde-inner" id="id32">default.windowed.key.serde.inner (deprecated) </a></li>
|
||||
<li><a class="reference internal" href="#default-windowed-value-serde-inner" id="id33">default.windowed.value.serde.inner (deprecated) </a></li>
|
||||
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
|
||||
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
|
||||
<li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
|
||||
|
@ -201,7 +199,7 @@ settings.put(... , ...);</code></pre>
|
|||
<div><p>The number of acknowledgments that the leader must have received before considering a request complete. This controls
|
||||
the durability of records that are sent. The possible values are:</p>
|
||||
<ul class="simple">
|
||||
<li><code class="docutils literal"><span class="pre">acks=0</span></code> The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the <code class="docutils literal"><span class="pre">retries</span></code> configuration will not take effect (as the client won’t generally know of any failures). The offset returned for each record will always be set to <code class="docutils literal"><span class="pre">-1</span></code>.</li>
|
||||
<li><code class="docutils literal"><span class="pre">acks=0</span></code> The producer does not wait for acknowledgment from the server and the record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the producer won’t generally know of any failures. The offset returned for each record will always be set to <code class="docutils literal"><span class="pre">-1</span></code>.</li>
|
||||
<li><code class="docutils literal"><span class="pre">acks=1</span></code> The leader writes the record to its local log and responds without waiting for full acknowledgement from all followers. If the leader immediately fails after acknowledging the record, but before the followers have replicated it, then the record will be lost.</li>
|
||||
<li><code class="docutils literal"><span class="pre">acks=all</span></code> The leader waits for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost if there is at least one in-sync replica alive. This is the strongest available guarantee.</li>
|
||||
</ul>
|
||||
|
@ -310,16 +308,6 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
|
|||
set by the user or all serdes must be passed in explicitly (see also default.key.serde).</td>
|
||||
<td><code class="docutils literal"><span class="pre">null</span></code></td>
|
||||
</tr>
|
||||
<tr class="row-even"><td>default.windowed.key.serde.inner (Deprecated. Use windowed.inner.class.serde instead.)</td>
|
||||
<td>Medium</td>
|
||||
<td colspan="2">Default serializer/deserializer for the inner class of windowed keys, implementing the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
|
||||
<td><code class="docutils literal"><span class="pre">null</span></code></td>
|
||||
</tr>
|
||||
<tr class="row-odd"><td>default.windowed.value.serde.inner (Deprecated. Use windowed.inner.class.serde instead.)</td>
|
||||
<td>Medium</td>
|
||||
<td colspan="2">Default serializer/deserializer for the inner class of windowed values, implementing the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
|
||||
<td><code class="docutils literal"><span class="pre">null</span></code></td>
|
||||
</tr>
|
||||
<tr class="row-even"><td>default.dsl.store</td>
|
||||
<td>Low</td>
|
||||
<td colspan="2">
|
||||
|
@ -428,7 +416,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
|
|||
</tr>
|
||||
<tr class="row-odd"><td>retry.backoff.ms</td>
|
||||
<td>Medium</td>
|
||||
<td colspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <code class="docutils literal"><span class="pre">retries</span></code> parameter is configured to be greater than 0. </td>
|
||||
<td colspan="2">The amount of time in milliseconds, before a request is retried.</td>
|
||||
<td><code class="docutils literal"><span class="pre">100</span></code></td>
|
||||
</tr>
|
||||
<tr class="row-even"><td>rocksdb.config.setter</td>
|
||||
|
@ -701,31 +689,6 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
|
|||
<p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
|
||||
</div></blockquote>
|
||||
</div>
|
||||
<div class="section" id="default-windowed-key-serde-inner">
|
||||
<h4><a class="toc-backref" href="#id32">default.windowed.key.serde.inner</a><a class="headerlink" href="#default-windowed-key-serde-inner" title="Permalink to this headline"></a> (Deprecated.)</h4>
|
||||
<blockquote>
|
||||
<div><p>The default Serializer/Deserializer class for the inner class of windowed keys. Serialization and deserialization in Kafka Streams happens
|
||||
whenever data needs to be materialized, for example:</p>
|
||||
<div><ul class="simple">
|
||||
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
|
||||
<li>Whenever data is read from or written to a <em>state store</em>.</li>
|
||||
</ul>
|
||||
<p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
|
||||
</div>
|
||||
</div></blockquote>
|
||||
</div>
|
||||
<div class="section" id="default-windowed-value-serde-inner">
|
||||
<h4><a class="toc-backref" href="#id33">default.windowed.value.serde.inner</a><a class="headerlink" href="#default-windowed-value-serde-inner" title="Permalink to this headline"></a>(Deprecated.)</h4>
|
||||
<blockquote>
|
||||
<div><p>The default Serializer/Deserializer class for the inner class of windowed values. Serialization and deserialization in Kafka Streams happens
|
||||
happens whenever data needs to be materialized, for example:</p>
|
||||
<ul class="simple">
|
||||
<li>Whenever data is read from or written to a <em>Kafka topic</em> (e.g., via the <code class="docutils literal"><span class="pre">StreamsBuilder#stream()</span></code> and <code class="docutils literal"><span class="pre">KStream#to()</span></code> methods).</li>
|
||||
<li>Whenever data is read from or written to a <em>state store</em>.</li>
|
||||
</ul>
|
||||
<p>This is discussed in more detail in <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data types and serialization</span></a>.</p>
|
||||
</div></blockquote>
|
||||
</div>
|
||||
<div class="section" id="rack-aware-assignment-non-overlap-cost">
|
||||
<h4><a class="toc-backref" href="#id37">rack.aware.assignment.non_overlap_cost</a><a class="headerlink" href="#rack-aware-assignment-non-overlap-cost" title="Permalink to this headline"></a></h4>
|
||||
<blockquote>
|
||||
|
@ -1167,8 +1130,7 @@ streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);</code></pr
|
|||
<div class="section" id="naming">
|
||||
<h4><a class="toc-backref" href="#id17">Naming</a><a class="headerlink" href="#naming" title="Permalink to this headline"></a></h4>
|
||||
<p>Some consumer, producer and admin client configuration parameters use the same parameter name, and Kafka Streams library itself also uses some parameters that share the same name with its embedded client. For example, <code class="docutils literal"><span class="pre">send.buffer.bytes</span></code> and
|
||||
<code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <code class="docutils literal"><span class="pre">request.timeout.ms</span></code> and <code class="docutils literal"><span class="pre">retry.backoff.ms</span></code> control retries for client request;
|
||||
<code class="docutils literal"><span class="pre">retries</span></code> are used to configure how many retries are allowed when handling retriable errors from broker request responses.
|
||||
<code class="docutils literal"><span class="pre">receive.buffer.bytes</span></code> are used to configure TCP buffers; <code class="docutils literal"><span class="pre">request.timeout.ms</span></code> and <code class="docutils literal"><span class="pre">retry.backoff.ms</span></code> control retries for client request.
|
||||
You can avoid duplicate names by prefix parameter names with <code class="docutils literal"><span class="pre">consumer.</span></code>, <code class="docutils literal"><span class="pre">producer.</span></code>, or <code class="docutils literal"><span class="pre">admin.</span></code> (e.g., <code class="docutils literal"><span class="pre">consumer.send.buffer.bytes</span></code> and <code class="docutils literal"><span class="pre">producer.send.buffer.bytes</span></code>).</p>
|
||||
<pre class="line-numbers"><code class="language-java">Properties streamsSettings = new Properties();
|
||||
// same value for consumer, producer, and admin client
|
||||
|
|
|
@ -71,7 +71,6 @@ import java.util.stream.Stream;
|
|||
import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
|
||||
import static org.apache.kafka.common.config.ConfigDef.ListSize.atMostOfSize;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
|
||||
import static org.apache.kafka.common.config.ConfigDef.Range.between;
|
||||
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
|
||||
import static org.apache.kafka.common.config.ConfigDef.parseType;
|
||||
|
||||
|
@ -588,22 +587,6 @@ public class StreamsConfig extends AbstractConfig {
|
|||
static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
|
||||
static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
|
||||
|
||||
/** {@code default.windowed.key.serde.inner}
|
||||
* @deprecated since 3.0.0 Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead. */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@Deprecated
|
||||
public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
|
||||
private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the " +
|
||||
"<code>org.apache.kafka.common.serialization.Serde</code> interface.";
|
||||
|
||||
/** {@code default.windowed.value.serde.inner}
|
||||
* @deprecated since 3.0.0 Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead. */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@Deprecated
|
||||
public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
|
||||
private static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed value. Must implement the " +
|
||||
"<code>org.apache.kafka.common.serialization.Serde</code> interface.";
|
||||
|
||||
public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde";
|
||||
private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " +
|
||||
"<code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result " +
|
||||
|
@ -612,16 +595,12 @@ public class StreamsConfig extends AbstractConfig {
|
|||
/** {@code default key.serde} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
|
||||
private static final String DEFAULT_KEY_SERDE_CLASS_DOC = "Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
|
||||
+ "Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
|
||||
+ DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well";
|
||||
private static final String DEFAULT_KEY_SERDE_CLASS_DOC = "Default serializer / deserializer class for key that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. ";
|
||||
|
||||
/** {@code default value.serde} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde";
|
||||
private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. "
|
||||
+ "Note when windowed serde class is used, one needs to set the inner serde class that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface via '"
|
||||
+ DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS + "' or '" + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS + "' as well";
|
||||
private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the <code>org.apache.kafka.common.serialization.Serde</code> interface. ";
|
||||
|
||||
/** {@code default.timestamp.extractor} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
|
@ -739,17 +718,6 @@ public class StreamsConfig extends AbstractConfig {
|
|||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
|
||||
|
||||
/**
|
||||
* {@code retries}
|
||||
* <p>
|
||||
* This config is ignored by Kafka Streams. Note, that the internal clients (producer, admin) are still impacted by this config.
|
||||
*
|
||||
* @deprecated since 2.7
|
||||
*/
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
@Deprecated
|
||||
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
|
||||
|
||||
/** {@code retry.backoff.ms} */
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
|
||||
|
@ -1160,12 +1128,6 @@ public class StreamsConfig extends AbstractConfig {
|
|||
atLeast(0L),
|
||||
Importance.LOW,
|
||||
CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_DOC)
|
||||
.define(RETRIES_CONFIG,
|
||||
Type.INT,
|
||||
0,
|
||||
between(0, Integer.MAX_VALUE),
|
||||
Importance.LOW,
|
||||
CommonClientConfigs.RETRIES_DOC)
|
||||
.define(RETRY_BACKOFF_MS_CONFIG,
|
||||
Type.LONG,
|
||||
100L,
|
||||
|
@ -1466,10 +1428,6 @@ public class StreamsConfig extends AbstractConfig {
|
|||
"Please use `{}` instead.", EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2);
|
||||
}
|
||||
|
||||
if (props.containsKey(RETRIES_CONFIG)) {
|
||||
log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release.", RETRIES_CONFIG);
|
||||
}
|
||||
|
||||
if (eosEnabled) {
|
||||
verifyEOSTransactionTimeoutCompatibility();
|
||||
}
|
||||
|
|
|
@ -1129,23 +1129,6 @@ public class StreamsConfigTest {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Test
|
||||
public void shouldLogWarningWhenRetriesIsUsed() {
|
||||
props.put(StreamsConfig.RETRIES_CONFIG, 0);
|
||||
|
||||
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
|
||||
appender.setClassLogger(StreamsConfig.class, Level.DEBUG);
|
||||
new StreamsConfig(props);
|
||||
|
||||
assertThat(
|
||||
appender.getMessages(),
|
||||
hasItem("Configuration parameter `" + StreamsConfig.RETRIES_CONFIG +
|
||||
"` is deprecated and will be removed in the 4.0.0 release.")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldSetDefaultAcceptableRecoveryLag() {
|
||||
final StreamsConfig config = new StreamsConfig(props);
|
||||
|
|
Loading…
Reference in New Issue