mirror of https://github.com/apache/kafka.git
KAFKA-9274: Revert deprecation of `retries` for producer and admin clients (#9333)
Reviewer: John Roesler <john@confluent.io>
This commit is contained in:
parent
d1c82a9baf
commit
a15387f34d
|
@ -78,9 +78,9 @@ public class CommonClientConfigs {
|
||||||
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
|
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
|
||||||
public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.";
|
public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.";
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public static final String RETRIES_CONFIG = "retries";
|
public static final String RETRIES_CONFIG = "retries";
|
||||||
public static final String RETRIES_DOC = "(Deprecated) Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error.";
|
public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error." +
|
||||||
|
" It is recommended to set the value to either zero or `MAX_VALUE` and use corresponding timeout parameters to control how long a client should retry a request.";
|
||||||
|
|
||||||
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
|
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
|
||||||
public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
|
public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
|
||||||
|
@ -194,10 +194,4 @@ public class CommonClientConfigs {
|
||||||
CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
|
CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
|
||||||
ClientDnsLookup.USE_ALL_DNS_IPS);
|
ClientDnsLookup.USE_ALL_DNS_IPS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void warnIfDeprecatedRetriesValue(AbstractConfig config) {
|
|
||||||
if (config.originals().containsKey(RETRIES_CONFIG)) {
|
|
||||||
log.warn("Configuration '{}' is deprecated and will be removed in future version.", RETRIES_CONFIG);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,11 +113,6 @@ public class AdminClientConfig extends AbstractConfig {
|
||||||
private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
|
private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
|
||||||
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
|
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
|
||||||
|
|
||||||
/**
|
|
||||||
* <code>retries</code>
|
|
||||||
* @deprecated since 2.7
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
|
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
|
||||||
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
|
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
|
||||||
|
|
||||||
|
@ -227,7 +222,6 @@ public class AdminClientConfig extends AbstractConfig {
|
||||||
@Override
|
@Override
|
||||||
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
|
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
|
||||||
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
|
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
|
||||||
CommonClientConfigs.warnIfDeprecatedRetriesValue(this);
|
|
||||||
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
|
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -549,7 +549,6 @@ public class KafkaAdminClient extends AdminClient {
|
||||||
return new LogContext("[AdminClient clientId=" + clientId + "] ");
|
return new LogContext("[AdminClient clientId=" + clientId + "] ");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private KafkaAdminClient(AdminClientConfig config,
|
private KafkaAdminClient(AdminClientConfig config,
|
||||||
String clientId,
|
String clientId,
|
||||||
Time time,
|
Time time,
|
||||||
|
|
|
@ -441,7 +441,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// visible for testing
|
// visible for testing
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
|
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
|
||||||
int maxInflightRequests = configureInflightRequests(producerConfig);
|
int maxInflightRequests = configureInflightRequests(producerConfig);
|
||||||
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
|
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
|
||||||
|
|
|
@ -126,7 +126,7 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
+ "after a call to <code>send()</code> returns. This limits the total time that a record will be delayed "
|
+ "after a call to <code>send()</code> returns. This limits the total time that a record will be delayed "
|
||||||
+ "prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed "
|
+ "prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed "
|
||||||
+ "for retriable send failures. The producer may report failure to send a record earlier than this config if "
|
+ "for retriable send failures. The producer may report failure to send a record earlier than this config if "
|
||||||
+ "either an unrecoverable error is encountered, the retries have been exhausted (deprecated), "
|
+ "either an unrecoverable error is encountered, the retries have been exhausted, "
|
||||||
+ "or the record is added to a batch which reached an earlier delivery expiration deadline. "
|
+ "or the record is added to a batch which reached an earlier delivery expiration deadline. "
|
||||||
+ "The value of this config should be greater than or equal to the sum of <code>" + REQUEST_TIMEOUT_MS_CONFIG + "</code> "
|
+ "The value of this config should be greater than or equal to the sum of <code>" + REQUEST_TIMEOUT_MS_CONFIG + "</code> "
|
||||||
+ "and <code>" + LINGER_MS_CONFIG + "</code>.";
|
+ "and <code>" + LINGER_MS_CONFIG + "</code>.";
|
||||||
|
@ -203,13 +203,9 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
+ " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
|
+ " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
|
||||||
+ " message re-ordering due to retries (i.e., if retries are enabled).";
|
+ " message re-ordering due to retries (i.e., if retries are enabled).";
|
||||||
|
|
||||||
/**
|
/** <code>retries</code> */
|
||||||
* <code>retries</code>
|
|
||||||
* @deprecated since 2.7
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
|
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
|
||||||
private static final String RETRIES_DOC = "(Deprecated) Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
|
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
|
||||||
+ " Note that this retry is no different than if the client resent the record upon receiving the error."
|
+ " Note that this retry is no different than if the client resent the record upon receiving the error."
|
||||||
+ " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
|
+ " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
|
||||||
+ " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
|
+ " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
|
||||||
|
@ -251,7 +247,7 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
|
public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
|
||||||
+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
|
+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
|
||||||
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5, "
|
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5, "
|
||||||
+ "<code>" + RETRIES_CONFIG + "</code> (deprecated) to be greater than 0 and <code>" + ACKS_CONFIG + "</code> must be 'all'. If these values "
|
+ "<code>" + RETRIES_CONFIG + "</code> to be greater than 0 and <code>" + ACKS_CONFIG + "</code> must be 'all'. If these values "
|
||||||
+ "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, "
|
+ "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, "
|
||||||
+ "a <code>ConfigException</code> will be thrown.";
|
+ "a <code>ConfigException</code> will be thrown.";
|
||||||
|
|
||||||
|
@ -439,7 +435,6 @@ public class ProducerConfig extends AbstractConfig {
|
||||||
@Override
|
@Override
|
||||||
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
|
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
|
||||||
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
|
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
|
||||||
CommonClientConfigs.warnIfDeprecatedRetriesValue(this);
|
|
||||||
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
|
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
|
||||||
maybeOverrideEnableIdempotence(refinedConfigs);
|
maybeOverrideEnableIdempotence(refinedConfigs);
|
||||||
maybeOverrideClientId(refinedConfigs);
|
maybeOverrideClientId(refinedConfigs);
|
||||||
|
|
|
@ -970,7 +970,6 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void testMetadataRetries() throws Exception {
|
public void testMetadataRetries() throws Exception {
|
||||||
// We should continue retrying on metadata update failures in spite of retry configuration
|
// We should continue retrying on metadata update failures in spite of retry configuration
|
||||||
|
@ -1970,8 +1969,7 @@ public class KafkaAdminClientTest {
|
||||||
@Test
|
@Test
|
||||||
public void testDescribeCluster() throws Exception {
|
public void testDescribeCluster() throws Exception {
|
||||||
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
|
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
|
||||||
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
|
AdminClientConfig.RETRIES_CONFIG, "2")) {
|
||||||
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100")) {
|
|
||||||
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||||
|
|
||||||
// Prepare the metadata response used for the first describe cluster
|
// Prepare the metadata response used for the first describe cluster
|
||||||
|
@ -2010,8 +2008,7 @@ public class KafkaAdminClientTest {
|
||||||
@Test
|
@Test
|
||||||
public void testListConsumerGroups() throws Exception {
|
public void testListConsumerGroups() throws Exception {
|
||||||
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
|
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
|
||||||
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
|
AdminClientConfig.RETRIES_CONFIG, "2")) {
|
||||||
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "500")) {
|
|
||||||
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||||
|
|
||||||
// Empty metadata response should be retried
|
// Empty metadata response should be retried
|
||||||
|
@ -2123,8 +2120,7 @@ public class KafkaAdminClientTest {
|
||||||
final Time time = new MockTime();
|
final Time time = new MockTime();
|
||||||
|
|
||||||
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
|
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
|
||||||
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0",
|
AdminClientConfig.RETRIES_CONFIG, "0")) {
|
||||||
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "0")) {
|
|
||||||
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||||
|
|
||||||
// Empty metadata causes the request to fail since we have no list of brokers
|
// Empty metadata causes the request to fail since we have no list of brokers
|
||||||
|
@ -2137,7 +2133,6 @@ public class KafkaAdminClientTest {
|
||||||
Collections.emptyList()));
|
Collections.emptyList()));
|
||||||
|
|
||||||
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
|
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
|
||||||
time.sleep(1L);
|
|
||||||
TestUtils.assertFutureError(result.all(), KafkaException.class);
|
TestUtils.assertFutureError(result.all(), KafkaException.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2210,7 +2205,6 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void testOffsetCommitNumRetries() throws Exception {
|
public void testOffsetCommitNumRetries() throws Exception {
|
||||||
final Cluster cluster = mockCluster(3, 0);
|
final Cluster cluster = mockCluster(3, 0);
|
||||||
|
@ -2282,7 +2276,6 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void testDescribeConsumerGroupNumRetries() throws Exception {
|
public void testDescribeConsumerGroupNumRetries() throws Exception {
|
||||||
final Cluster cluster = mockCluster(3, 0);
|
final Cluster cluster = mockCluster(3, 0);
|
||||||
|
@ -2583,7 +2576,6 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void testListConsumerGroupOffsetsNumRetries() throws Exception {
|
public void testListConsumerGroupOffsetsNumRetries() throws Exception {
|
||||||
final Cluster cluster = mockCluster(3, 0);
|
final Cluster cluster = mockCluster(3, 0);
|
||||||
|
@ -2695,7 +2687,6 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteConsumerGroupsNumRetries() throws Exception {
|
public void testDeleteConsumerGroupsNumRetries() throws Exception {
|
||||||
final Cluster cluster = mockCluster(3, 0);
|
final Cluster cluster = mockCluster(3, 0);
|
||||||
|
@ -2859,7 +2850,6 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception {
|
public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception {
|
||||||
final Cluster cluster = mockCluster(3, 0);
|
final Cluster cluster = mockCluster(3, 0);
|
||||||
|
@ -3144,7 +3134,6 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveMembersFromGroupNumRetries() throws Exception {
|
public void testRemoveMembersFromGroupNumRetries() throws Exception {
|
||||||
final Cluster cluster = mockCluster(3, 0);
|
final Cluster cluster = mockCluster(3, 0);
|
||||||
|
@ -4465,8 +4454,7 @@ public class KafkaAdminClientTest {
|
||||||
long defaultApiTimeout = 60000;
|
long defaultApiTimeout = 60000;
|
||||||
MockTime time = new MockTime();
|
MockTime time = new MockTime();
|
||||||
|
|
||||||
try (AdminClientUnitTestEnv env = mockClientEnv(time,
|
try (AdminClientUnitTestEnv env = mockClientEnv(time, AdminClientConfig.RETRIES_CONFIG, "0")) {
|
||||||
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, String.valueOf(defaultApiTimeout))) {
|
|
||||||
|
|
||||||
// Provide only one prepared response from node 1
|
// Provide only one prepared response from node 1
|
||||||
env.kafkaClient().prepareResponseFrom(
|
env.kafkaClient().prepareResponseFrom(
|
||||||
|
@ -4668,8 +4656,7 @@ public class KafkaAdminClientTest {
|
||||||
long defaultApiTimeout = 60000;
|
long defaultApiTimeout = 60000;
|
||||||
MockTime time = new MockTime();
|
MockTime time = new MockTime();
|
||||||
|
|
||||||
try (AdminClientUnitTestEnv env = mockClientEnv(time,
|
try (AdminClientUnitTestEnv env = mockClientEnv(time, AdminClientConfig.RETRIES_CONFIG, "0")) {
|
||||||
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, String.valueOf(defaultApiTimeout))) {
|
|
||||||
|
|
||||||
env.kafkaClient().prepareResponseFrom(
|
env.kafkaClient().prepareResponseFrom(
|
||||||
prepareDescribeLogDirsResponse(Errors.NONE, "/data"),
|
prepareDescribeLogDirsResponse(Errors.NONE, "/data"),
|
||||||
|
|
|
@ -139,7 +139,6 @@ public class KafkaProducerTest {
|
||||||
Collections.emptySet());
|
Collections.emptySet());
|
||||||
private final int defaultMetadataIdleMs = 5 * 60 * 1000;
|
private final int defaultMetadataIdleMs = 5 * 60 * 1000;
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
@Test
|
@Test
|
||||||
public void testOverwriteAcksAndRetriesForIdempotentProducers() {
|
public void testOverwriteAcksAndRetriesForIdempotentProducers() {
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
|
|
|
@ -148,7 +148,6 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
|
||||||
this.statusTopic = statusTopic;
|
this.statusTopic = statusTopic;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
@Override
|
@Override
|
||||||
public void configure(final WorkerConfig config) {
|
public void configure(final WorkerConfig config) {
|
||||||
this.statusTopic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
|
this.statusTopic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
|
||||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
|
||||||
import org.apache.kafka.common.KafkaException
|
import org.apache.kafka.common.KafkaException
|
||||||
import org.apache.kafka.common.utils.Utils
|
import org.apache.kafka.common.utils.Utils
|
||||||
|
|
||||||
import scala.annotation.nowarn
|
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
object ConsoleProducer {
|
object ConsoleProducer {
|
||||||
|
@ -79,7 +78,6 @@ object ConsoleProducer {
|
||||||
props
|
props
|
||||||
}
|
}
|
||||||
|
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
def producerProps(config: ProducerConfig): Properties = {
|
def producerProps(config: ProducerConfig): Properties = {
|
||||||
val props =
|
val props =
|
||||||
if (config.options.has(config.producerConfigOpt))
|
if (config.options.has(config.producerConfigOpt))
|
||||||
|
@ -148,7 +146,7 @@ object ConsoleProducer {
|
||||||
.describedAs("size")
|
.describedAs("size")
|
||||||
.ofType(classOf[java.lang.Integer])
|
.ofType(classOf[java.lang.Integer])
|
||||||
.defaultsTo(200)
|
.defaultsTo(200)
|
||||||
val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "(Deprecated) Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message.")
|
val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message.")
|
||||||
.withRequiredArg
|
.withRequiredArg
|
||||||
.ofType(classOf[java.lang.Integer])
|
.ofType(classOf[java.lang.Integer])
|
||||||
.defaultsTo(3)
|
.defaultsTo(3)
|
||||||
|
|
|
@ -1665,7 +1665,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
|
||||||
def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this }
|
def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this }
|
||||||
def deliveryTimeoutMs(timeoutMs: Int): ProducerBuilder = { _deliveryTimeoutMs= timeoutMs; this }
|
def deliveryTimeoutMs(timeoutMs: Int): ProducerBuilder = { _deliveryTimeoutMs= timeoutMs; this }
|
||||||
|
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
override def build(): KafkaProducer[String, String] = {
|
override def build(): KafkaProducer[String, String] = {
|
||||||
val producerProps = propsOverride
|
val producerProps = propsOverride
|
||||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
|
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.kafka.common.errors.WakeupException
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer
|
import org.apache.kafka.common.serialization.StringDeserializer
|
||||||
import org.junit.{After, Before}
|
import org.junit.{After, Before}
|
||||||
|
|
||||||
import scala.annotation.nowarn
|
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
|
@ -84,7 +83,6 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
|
||||||
new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
|
new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
|
||||||
}
|
}
|
||||||
|
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
|
def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
|
||||||
val opts = new ConsumerGroupCommandOptions(args)
|
val opts = new ConsumerGroupCommandOptions(args)
|
||||||
val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
|
val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
|
||||||
import org.junit.{Before, Test}
|
import org.junit.{Before, Test}
|
||||||
import org.scalatest.Assertions.fail
|
import org.scalatest.Assertions.fail
|
||||||
|
|
||||||
import scala.annotation.nowarn
|
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -107,7 +106,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
|
||||||
testProduceAfterLogDirFailureOnLeader(Checkpoint)
|
testProduceAfterLogDirFailureOnLeader(Checkpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
@Test
|
@Test
|
||||||
def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = {
|
def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = {
|
||||||
this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
|
this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
|
||||||
|
@ -139,7 +137,6 @@ class LogDirFailureTest extends IntegrationTestHarness {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
def testProduceErrorsFromLogDirFailureOnLeader(failureType: LogDirFailureType): Unit = {
|
def testProduceErrorsFromLogDirFailureOnLeader(failureType: LogDirFailureType): Unit = {
|
||||||
// Disable retries to allow exception to bubble up for validation
|
// Disable retries to allow exception to bubble up for validation
|
||||||
this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
|
this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
|
||||||
|
|
|
@ -64,7 +64,6 @@ import org.apache.zookeeper.data.ACL
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
import org.scalatest.Assertions.fail
|
import org.scalatest.Assertions.fail
|
||||||
|
|
||||||
import scala.annotation.nowarn
|
|
||||||
import scala.jdk.CollectionConverters._
|
import scala.jdk.CollectionConverters._
|
||||||
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
|
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
|
||||||
import scala.collection.{Map, Seq, mutable}
|
import scala.collection.{Map, Seq, mutable}
|
||||||
|
@ -585,7 +584,6 @@ object TestUtils extends Logging {
|
||||||
/**
|
/**
|
||||||
* Create a (new) producer with a few pre-configured properties.
|
* Create a (new) producer with a few pre-configured properties.
|
||||||
*/
|
*/
|
||||||
@nowarn("cat=deprecation")
|
|
||||||
def createProducer[K, V](brokerList: String,
|
def createProducer[K, V](brokerList: String,
|
||||||
acks: Int = -1,
|
acks: Int = -1,
|
||||||
maxBlockMs: Long = 60 * 1000L,
|
maxBlockMs: Long = 60 * 1000L,
|
||||||
|
|
|
@ -101,14 +101,10 @@
|
||||||
The old variable is deprecated. Note, that the parameter name itself is not affected.
|
The old variable is deprecated. Note, that the parameter name itself is not affected.
|
||||||
(Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name">KIP-629</a>.)
|
(Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name">KIP-629</a>.)
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
<p>
|
<p>
|
||||||
The configuration parameter <code>retries</code> is deprecated in favor of the new parameter <code>task.timeout.ms</code>.
|
The configuration parameter <code>retries</code> is deprecated in favor of the new parameter <code>task.timeout.ms</code>.
|
||||||
Kafka Streams' runtime ignores <code>retries</code> if set, however, it would still forward the parameter
|
Kafka Streams' runtime ignores <code>retries</code> if set, however, it would still forward the parameter
|
||||||
to its internal clients. Note though, that <code>retries</code> is deprecated for the producer and admin client, too.
|
to its internal clients.
|
||||||
Thus, instead of setting <code>retries</code>, you should configure the corresponding client timeouts, namely
|
|
||||||
<code>delivery.timeout.ms</code> and <code>max.block.ms</code> for the producer and
|
|
||||||
<code>default.api.timeout.ms</code> for the admin client.
|
|
||||||
</p>
|
</p>
|
||||||
<p>
|
<p>
|
||||||
We added <code>SlidingWindows</code> as an option for <code>windowedBy()</code> windowed aggregations as described in
|
We added <code>SlidingWindows</code> as an option for <code>windowedBy()</code> windowed aggregations as described in
|
||||||
|
|
|
@ -21,10 +21,9 @@
|
||||||
|
|
||||||
<h5><a id="upgrade_270_notable" href="#upgrade_270_notable">Notable changes in 2.7.0</a></h5>
|
<h5><a id="upgrade_270_notable" href="#upgrade_270_notable">Notable changes in 2.7.0</a></h5>
|
||||||
<ul>
|
<ul>
|
||||||
<li>The configuration parameter <code>retries</code> is deprecated for the producer, admin, and Kafka Streams clients
|
<li>The configuration parameter <code>retries</code> is deprecated for the Kafka Streams client
|
||||||
via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams">KIP-572</a>.
|
via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams">KIP-572</a>.
|
||||||
You should use the producer's <code>delivery.timeout.ms</code> and <code>max.block.ms</code>, admin's
|
You should use the new <code>task.timeout.ms</code> parameters instead.
|
||||||
<code>default.api.timeout.ms</code>, and Kafka Streams' new <code>task.timeout.ms</code> parameters instead.
|
|
||||||
Note that parameter <code>retry.backoff.ms</code> is not impacted by this change.
|
Note that parameter <code>retry.backoff.ms</code> is not impacted by this change.
|
||||||
</li>
|
</li>
|
||||||
<li>Altering non-reconfigurable configs of existent listeners causes <code>InvalidRequestException</code>.
|
<li>Altering non-reconfigurable configs of existent listeners causes <code>InvalidRequestException</code>.
|
||||||
|
|
|
@ -272,7 +272,6 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
|
||||||
this.sslEngineFactoryClass = sslEngineFactoryClass;
|
this.sslEngineFactoryClass = sslEngineFactoryClass;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
@Override
|
@Override
|
||||||
public void activateOptions() {
|
public void activateOptions() {
|
||||||
// check for config parameter validity
|
// check for config parameter validity
|
||||||
|
|
|
@ -693,21 +693,18 @@ public class StreamsConfigTest {
|
||||||
assertThat(producerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), is(nullValue()));
|
assertThat(producerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotOverrideUserConfigRetriesIfExactlyAlphaOnceEnabled() {
|
public void shouldNotOverrideUserConfigRetriesIfExactlyAlphaOnceEnabled() {
|
||||||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
|
||||||
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
|
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotOverrideUserConfigRetriesIfExactlyBetaOnceEnabled() {
|
public void shouldNotOverrideUserConfigRetriesIfExactlyBetaOnceEnabled() {
|
||||||
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
|
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
|
||||||
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
|
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
|
private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
|
||||||
final int numberOfRetries = 42;
|
final int numberOfRetries = 42;
|
||||||
props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
|
props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
|
||||||
|
|
|
@ -45,7 +45,6 @@ public class StreamsBrokerDownResilienceTest {
|
||||||
|
|
||||||
private static final String SINK_TOPIC = "streamsResilienceSink";
|
private static final String SINK_TOPIC = "streamsResilienceSink";
|
||||||
|
|
||||||
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
|
|
||||||
public static void main(final String[] args) throws IOException {
|
public static void main(final String[] args) throws IOException {
|
||||||
if (args.length < 2) {
|
if (args.length < 2) {
|
||||||
System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
|
System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
|
||||||
|
@ -123,7 +122,6 @@ public class StreamsBrokerDownResilienceTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
|
|
||||||
private static boolean confirmCorrectConfigs(final Properties properties) {
|
private static boolean confirmCorrectConfigs(final Properties properties) {
|
||||||
return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) &&
|
return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) &&
|
||||||
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) &&
|
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) &&
|
||||||
|
|
|
@ -49,7 +49,6 @@ import static java.time.Duration.ofMillis;
|
||||||
|
|
||||||
public class StreamsOptimizedTest {
|
public class StreamsOptimizedTest {
|
||||||
|
|
||||||
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
if (args.length < 1) {
|
if (args.length < 1) {
|
||||||
System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: ");
|
System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: ");
|
||||||
|
|
|
@ -43,7 +43,6 @@ import java.util.Set;
|
||||||
|
|
||||||
public class StreamsStandByReplicaTest {
|
public class StreamsStandByReplicaTest {
|
||||||
|
|
||||||
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
|
|
||||||
public static void main(final String[] args) throws IOException {
|
public static void main(final String[] args) throws IOException {
|
||||||
if (args.length < 2) {
|
if (args.length < 2) {
|
||||||
System.err.println("StreamsStandByReplicaTest are expecting two parameters: " +
|
System.err.println("StreamsStandByReplicaTest are expecting two parameters: " +
|
||||||
|
@ -159,7 +158,6 @@ public class StreamsStandByReplicaTest {
|
||||||
streams.close(Duration.ofSeconds(10));
|
streams.close(Duration.ofSeconds(10));
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
|
|
||||||
private static boolean confirmCorrectConfigs(final Properties properties) {
|
private static boolean confirmCorrectConfigs(final Properties properties) {
|
||||||
return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) &&
|
return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) &&
|
||||||
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) &&
|
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) &&
|
||||||
|
|
|
@ -217,7 +217,6 @@ public class VerifiableProducer implements AutoCloseable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Construct a VerifiableProducer object from command-line arguments. */
|
/** Construct a VerifiableProducer object from command-line arguments. */
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
|
public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
|
||||||
Namespace res = parser.parseArgs(args);
|
Namespace res = parser.parseArgs(args);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue