KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config (#8864)

- part of KIP-572
 - deprecates producer config `retries` (still in use)
 - deprecates admin config `retries` (still in use)
 - deprecates Kafka Streams config `retries` (will be ignored)
 - adds new Kafka Streams config `task.timeout.ms` (follow up PRs will leverage this new config)

Reviewers: John Roesler <john@confluent.io>, Jason Gustafson <jason@confluent.io>, Randall Hauch <randall@confluent.io>
This commit is contained in:
Matthias J. Sax 2020-07-21 12:19:13 -07:00 committed by GitHub
parent b63fecb693
commit 194c56fce2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 156 additions and 42 deletions

View File

@ -78,8 +78,9 @@ public class CommonClientConfigs {
public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms"; public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG = "reconnect.backoff.max.ms";
public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms."; public static final String RECONNECT_BACKOFF_MAX_MS_DOC = "The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.";
@Deprecated
public static final String RETRIES_CONFIG = "retries"; public static final String RETRIES_CONFIG = "retries";
public static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error."; public static final String RETRIES_DOC = "(Deprecated) Setting a value greater than zero will cause the client to resend any request that fails with a potentially transient error.";
public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms"; public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios."; public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";
@ -193,4 +194,10 @@ public class CommonClientConfigs {
CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT, CLIENT_DNS_LOOKUP_CONFIG, ClientDnsLookup.DEFAULT,
ClientDnsLookup.USE_ALL_DNS_IPS); ClientDnsLookup.USE_ALL_DNS_IPS);
} }
public static void warnIfDeprecatedRetriesValue(AbstractConfig config) {
if (config.originals().containsKey(RETRIES_CONFIG)) {
log.warn("Configuration '{}' is deprecated and will be removed in future version.", RETRIES_CONFIG);
}
}
} }

View File

@ -113,6 +113,11 @@ public class AdminClientConfig extends AbstractConfig {
private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC; private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC; private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC;
/**
* <code>retries</code>
* @deprecated since 2.7
*/
@Deprecated
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG; public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
@ -222,6 +227,7 @@ public class AdminClientConfig extends AbstractConfig {
@Override @Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) { protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this); CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
CommonClientConfigs.warnIfDeprecatedRetriesValue(this);
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
} }

View File

@ -531,6 +531,7 @@ public class KafkaAdminClient extends AdminClient {
return new LogContext("[AdminClient clientId=" + clientId + "] "); return new LogContext("[AdminClient clientId=" + clientId + "] ");
} }
@SuppressWarnings("deprecation")
private KafkaAdminClient(AdminClientConfig config, private KafkaAdminClient(AdminClientConfig config,
String clientId, String clientId,
Time time, Time time,

View File

@ -441,6 +441,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
} }
// visible for testing // visible for testing
@SuppressWarnings("deprecation")
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
int maxInflightRequests = configureInflightRequests(producerConfig); int maxInflightRequests = configureInflightRequests(producerConfig);
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);

View File

@ -126,7 +126,7 @@ public class ProducerConfig extends AbstractConfig {
+ "after a call to <code>send()</code> returns. This limits the total time that a record will be delayed " + "after a call to <code>send()</code> returns. This limits the total time that a record will be delayed "
+ "prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed " + "prior to sending, the time to await acknowledgement from the broker (if expected), and the time allowed "
+ "for retriable send failures. The producer may report failure to send a record earlier than this config if " + "for retriable send failures. The producer may report failure to send a record earlier than this config if "
+ "either an unrecoverable error is encountered, the retries have been exhausted, " + "either an unrecoverable error is encountered, the retries have been exhausted (deprecated), "
+ "or the record is added to a batch which reached an earlier delivery expiration deadline. " + "or the record is added to a batch which reached an earlier delivery expiration deadline. "
+ "The value of this config should be greater than or equal to the sum of <code>" + REQUEST_TIMEOUT_MS_CONFIG + "</code> " + "The value of this config should be greater than or equal to the sum of <code>" + REQUEST_TIMEOUT_MS_CONFIG + "</code> "
+ "and <code>" + LINGER_MS_CONFIG + "</code>."; + "and <code>" + LINGER_MS_CONFIG + "</code>.";
@ -203,9 +203,13 @@ public class ProducerConfig extends AbstractConfig {
+ " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of" + " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
+ " message re-ordering due to retries (i.e., if retries are enabled)."; + " message re-ordering due to retries (i.e., if retries are enabled).";
/** <code>retries</code> */ /**
* <code>retries</code>
* @deprecated since 2.7
*/
@Deprecated
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
private static final String RETRIES_DOC = "Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error." private static final String RETRIES_DOC = "(Deprecated) Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error."
+ " Note that this retry is no different than if the client resent the record upon receiving the error." + " Note that this retry is no different than if the client resent the record upon receiving the error."
+ " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the" + " Allowing retries without setting <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to 1 will potentially change the"
+ " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second" + " ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second"
@ -247,7 +251,7 @@ public class ProducerConfig extends AbstractConfig {
public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer " public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
+ "retries due to broker failures, etc., may write duplicates of the retried message in the stream. " + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. "
+ "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5, " + "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be less than or equal to 5, "
+ "<code>" + RETRIES_CONFIG + "</code> to be greater than 0 and <code>" + ACKS_CONFIG + "</code> must be 'all'. If these values " + "<code>" + RETRIES_CONFIG + "</code> (deprecated) to be greater than 0 and <code>" + ACKS_CONFIG + "</code> must be 'all'. If these values "
+ "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, " + "are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, "
+ "a <code>ConfigException</code> will be thrown."; + "a <code>ConfigException</code> will be thrown.";
@ -435,6 +439,7 @@ public class ProducerConfig extends AbstractConfig {
@Override @Override
protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) { protected Map<String, Object> postProcessParsedConfig(final Map<String, Object> parsedValues) {
CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this); CommonClientConfigs.warnIfDeprecatedDnsLookupValue(this);
CommonClientConfigs.warnIfDeprecatedRetriesValue(this);
Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); Map<String, Object> refinedConfigs = CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
maybeOverrideEnableIdempotence(refinedConfigs); maybeOverrideEnableIdempotence(refinedConfigs);
maybeOverrideClientId(refinedConfigs); maybeOverrideClientId(refinedConfigs);

View File

@ -677,7 +677,8 @@ public class KafkaAdminClientTest {
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster, try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999", newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000000", AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000000",
AdminClientConfig.RETRIES_CONFIG, "0"))) { AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100"))) {
// The first request fails with a disconnect // The first request fails with a disconnect
env.kafkaClient().prepareResponse(null, true); env.kafkaClient().prepareResponse(null, true);
@ -1287,7 +1288,8 @@ public class KafkaAdminClientTest {
@Test @Test
public void testDescribeCluster() throws Exception { public void testDescribeCluster() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) { AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Prepare the metadata response used for the first describe cluster // Prepare the metadata response used for the first describe cluster
@ -1326,7 +1328,8 @@ public class KafkaAdminClientTest {
@Test @Test
public void testListConsumerGroups() throws Exception { public void testListConsumerGroups() throws Exception {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0), try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(4, 0),
AdminClientConfig.RETRIES_CONFIG, "2")) { AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "500")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata response should be retried // Empty metadata response should be retried
@ -1438,7 +1441,8 @@ public class KafkaAdminClientTest {
final Time time = new MockTime(); final Time time = new MockTime();
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
AdminClientConfig.RETRIES_CONFIG, "0")) { AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "0",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "0")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata causes the request to fail since we have no list of brokers // Empty metadata causes the request to fail since we have no list of brokers
@ -1451,6 +1455,7 @@ public class KafkaAdminClientTest {
Collections.emptyList())); Collections.emptyList()));
final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups(); final ListConsumerGroupsResult result = env.adminClient().listConsumerGroups();
time.sleep(1L);
TestUtils.assertFutureError(result.all(), KafkaException.class); TestUtils.assertFutureError(result.all(), KafkaException.class);
} }
} }
@ -1523,6 +1528,7 @@ public class KafkaAdminClientTest {
} }
} }
@Deprecated
@Test @Test
public void testOffsetCommitNumRetries() throws Exception { public void testOffsetCommitNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0); final Cluster cluster = mockCluster(3, 0);
@ -1594,6 +1600,7 @@ public class KafkaAdminClientTest {
} }
} }
@Deprecated
@Test @Test
public void testDescribeConsumerGroupNumRetries() throws Exception { public void testDescribeConsumerGroupNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0); final Cluster cluster = mockCluster(3, 0);
@ -1784,7 +1791,7 @@ public class KafkaAdminClientTest {
} }
@Test @Test
public void testDescribeMultipleConsumerGroups() throws Exception { public void testDescribeMultipleConsumerGroups() {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@ -1894,6 +1901,7 @@ public class KafkaAdminClientTest {
} }
} }
@Deprecated
@Test @Test
public void testListConsumerGroupOffsetsNumRetries() throws Exception { public void testListConsumerGroupOffsetsNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0); final Cluster cluster = mockCluster(3, 0);
@ -2005,6 +2013,7 @@ public class KafkaAdminClientTest {
} }
} }
@Deprecated
@Test @Test
public void testDeleteConsumerGroupsNumRetries() throws Exception { public void testDeleteConsumerGroupsNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0); final Cluster cluster = mockCluster(3, 0);
@ -2168,6 +2177,7 @@ public class KafkaAdminClientTest {
} }
} }
@Deprecated
@Test @Test
public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception { public void testDeleteConsumerGroupOffsetsNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0); final Cluster cluster = mockCluster(3, 0);
@ -2452,6 +2462,7 @@ public class KafkaAdminClientTest {
} }
} }
@Deprecated
@Test @Test
public void testRemoveMembersFromGroupNumRetries() throws Exception { public void testRemoveMembersFromGroupNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0); final Cluster cluster = mockCluster(3, 0);
@ -3705,7 +3716,9 @@ public class KafkaAdminClientTest {
@Test @Test
public void testAlterReplicaLogDirsPartialFailure() throws Exception { public void testAlterReplicaLogDirsPartialFailure() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.RETRIES_CONFIG, "0")) { try (AdminClientUnitTestEnv env = mockClientEnv(
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100")) {
// As we won't retry, this calls fails immediately with a DisconnectException // As we won't retry, this calls fails immediately with a DisconnectException
env.kafkaClient().prepareResponseFrom( env.kafkaClient().prepareResponseFrom(
prepareAlterLogDirsResponse(Errors.NONE, "topic", 1), prepareAlterLogDirsResponse(Errors.NONE, "topic", 1),
@ -3748,7 +3761,9 @@ public class KafkaAdminClientTest {
@Test @Test
public void testDescribeLogDirsPartialFailure() throws Exception { public void testDescribeLogDirsPartialFailure() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.RETRIES_CONFIG, "0")) { try (AdminClientUnitTestEnv env = mockClientEnv(
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "100",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100")) {
// As we won't retry, this calls fails immediately with a DisconnectException // As we won't retry, this calls fails immediately with a DisconnectException
env.kafkaClient().prepareResponseFrom( env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(Errors.NONE, "/data"), prepareDescribeLogDirsResponse(Errors.NONE, "/data"),

View File

@ -16,9 +16,9 @@
*/ */
package org.apache.kafka.clients.consumer; package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo; import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.junit.Test; import org.junit.Test;

View File

@ -139,6 +139,7 @@ public class KafkaProducerTest {
Collections.emptySet()); Collections.emptySet());
private final int defaultMetadataIdleMs = 5 * 60 * 1000; private final int defaultMetadataIdleMs = 5 * 60 * 1000;
@SuppressWarnings("deprecation")
@Test @Test
public void testOverwriteAcksAndRetriesForIdempotentProducers() { public void testOverwriteAcksAndRetriesForIdempotentProducers() {
Properties props = new Properties(); Properties props = new Properties();

View File

@ -148,6 +148,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
this.statusTopic = statusTopic; this.statusTopic = statusTopic;
} }
@SuppressWarnings("deprecation")
@Override @Override
public void configure(final WorkerConfig config) { public void configure(final WorkerConfig config) {
this.statusTopic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG); this.statusTopic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);

View File

@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce
import org.apache.kafka.common.KafkaException import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.Utils
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
object ConsoleProducer { object ConsoleProducer {
@ -78,6 +79,7 @@ object ConsoleProducer {
props props
} }
@nowarn("cat=deprecation")
def producerProps(config: ProducerConfig): Properties = { def producerProps(config: ProducerConfig): Properties = {
val props = val props =
if (config.options.has(config.producerConfigOpt)) if (config.options.has(config.producerConfigOpt))
@ -146,7 +148,7 @@ object ConsoleProducer {
.describedAs("size") .describedAs("size")
.ofType(classOf[java.lang.Integer]) .ofType(classOf[java.lang.Integer])
.defaultsTo(200) .defaultsTo(200)
val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.") val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "(Deprecated) Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
.withRequiredArg .withRequiredArg
.ofType(classOf[java.lang.Integer]) .ofType(classOf[java.lang.Integer])
.defaultsTo(3) .defaultsTo(3)

View File

@ -1633,6 +1633,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this } def requestTimeoutMs(timeoutMs: Int): ProducerBuilder = { _requestTimeoutMs = timeoutMs; this }
def deliveryTimeoutMs(timeoutMs: Int): ProducerBuilder = { _deliveryTimeoutMs= timeoutMs; this } def deliveryTimeoutMs(timeoutMs: Int): ProducerBuilder = { _deliveryTimeoutMs= timeoutMs; this }
@nowarn("cat=deprecation")
override def build(): KafkaProducer[String, String] = { override def build(): KafkaProducer[String, String] = {
val producerProps = propsOverride val producerProps = propsOverride
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)

View File

@ -32,6 +32,7 @@ import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.{After, Before} import org.junit.{After, Before}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
@ -83,6 +84,7 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
new KafkaConsumer(props, new StringDeserializer, new StringDeserializer) new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
} }
@nowarn("cat=deprecation")
def getConsumerGroupService(args: Array[String]): ConsumerGroupService = { def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
val opts = new ConsumerGroupCommandOptions(args) val opts = new ConsumerGroupCommandOptions(args)
val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString)) val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))

View File

@ -33,6 +33,7 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.{Before, Test} import org.junit.{Before, Test}
import org.scalatest.Assertions.fail import org.scalatest.Assertions.fail
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
/** /**
@ -106,6 +107,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
testProduceAfterLogDirFailureOnLeader(Checkpoint) testProduceAfterLogDirFailureOnLeader(Checkpoint)
} }
@nowarn("cat=deprecation")
@Test @Test
def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = { def testReplicaFetcherThreadAfterLogDirFailureOnFollower(): Unit = {
this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0") this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
@ -137,6 +139,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
} }
} }
@nowarn("cat=deprecation")
def testProduceErrorsFromLogDirFailureOnLeader(failureType: LogDirFailureType): Unit = { def testProduceErrorsFromLogDirFailureOnLeader(failureType: LogDirFailureType): Unit = {
// Disable retries to allow exception to bubble up for validation // Disable retries to allow exception to bubble up for validation
this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0") this.producerConfig.setProperty(ProducerConfig.RETRIES_CONFIG, "0")

View File

@ -30,7 +30,7 @@ import javax.net.ssl.X509TrustManager
import kafka.api._ import kafka.api._
import kafka.cluster.{Broker, EndPoint} import kafka.cluster.{Broker, EndPoint}
import kafka.log._ import kafka.log._
import kafka.security.auth.{Acl, Authorizer => LegacyAuthorizer, Resource} import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
import kafka.server._ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.checkpoints.OffsetCheckpointFile
import com.yammer.metrics.core.Meter import com.yammer.metrics.core.Meter
@ -64,6 +64,7 @@ import org.apache.zookeeper.data.ACL
import org.junit.Assert._ import org.junit.Assert._
import org.scalatest.Assertions.fail import org.scalatest.Assertions.fail
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._ import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Map, Seq, mutable} import scala.collection.{Map, Seq, mutable}
@ -580,6 +581,7 @@ object TestUtils extends Logging {
/** /**
* Create a (new) producer with a few pre-configured properties. * Create a (new) producer with a few pre-configured properties.
*/ */
@nowarn("cat=deprecation")
def createProducer[K, V](brokerList: String, def createProducer[K, V](brokerList: String,
acks: Int = -1, acks: Int = -1,
maxBlockMs: Long = 60 * 1000L, maxBlockMs: Long = 60 * 1000L,

View File

@ -94,6 +94,14 @@
(Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name">KIP-629</a>.) (Cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-626%3A+Rename+StreamsConfig+config+variable+name">KIP-629</a>.)
</p> </p>
<p>
The configuration parameter <code>retries</code> is deprecated in favor of a the new parameter <code>task.timeout.ms</code>.
Kafka Streams runtime ignores <code>retries</code> if set, however, if would still forward the parameter
to it's internal clients. Note though, that <code>retries</code> is deprecated for the producer and admin client, too.
Thus, instead of setting <code>retries</code>, you should configure the corresponding client timeouts.
</p>
<h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3> <h3><a id="streams_api_changes_260" href="#streams_api_changes_260">Streams API changes in 2.6.0</a></h3>
<p> <p>
We added a new processing mode that improves application scalability using exactly-once guarantees We added a new processing mode that improves application scalability using exactly-once guarantees

View File

@ -19,6 +19,16 @@
<script id="upgrade-template" type="text/x-handlebars-template"> <script id="upgrade-template" type="text/x-handlebars-template">
<h5><a id="upgrade_270_notable" href="#upgrade_270_notable">Notable changes in 2.7.0</a></h5>
<ul>
<li>The configuration parameter <code>retries</code> is deprecated for the producer, admin, and Kafka Streams clients
via <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams">KIP-572</a>.
You should use the producer's <code>delivery.timeout.ms</code>, admin's <code>default.api.timeout.ms</code>, and
Kafka Streams' new <code>task.timeout.ms</code> parameters instead.
Note that parameter <code>retry.backoff.ms</code> is not impacted by this change.
</li>
</ul>
<h5><a id="upgrade_260_notable" href="#upgrade_260_notable">Notable changes in 2.6.0</a></h5> <h5><a id="upgrade_260_notable" href="#upgrade_260_notable">Notable changes in 2.6.0</a></h5>
<ul> <ul>
<li>Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application <li>Kafka Streams adds a new processing mode (requires broker 2.5 or newer) that improves application

View File

@ -252,6 +252,7 @@ public class KafkaLog4jAppender extends AppenderSkeleton {
this.sslEngineFactoryClass = sslEngineFactoryClass; this.sslEngineFactoryClass = sslEngineFactoryClass;
} }
@SuppressWarnings("deprecation")
@Override @Override
public void activateOptions() { public void activateOptions() {
// check for config parameter validity // check for config parameter validity

View File

@ -44,6 +44,7 @@ import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
@ -488,8 +489,15 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
/** {@code retries} */ /**
* {@code retries}
* <p>
* This config is ignored by Kafka Streams. Note, that the internal clients (producer, admin) are still impacted by this config.
*
* @deprecated since 2.7
*/
@SuppressWarnings("WeakerAccess") @SuppressWarnings("WeakerAccess")
@Deprecated
public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG; public static final String RETRIES_CONFIG = CommonClientConfigs.RETRIES_CONFIG;
/** {@code retry.backoff.ms} */ /** {@code retry.backoff.ms} */
@ -519,6 +527,12 @@ public class StreamsConfig extends AbstractConfig {
public static final String STATE_DIR_CONFIG = "state.dir"; public static final String STATE_DIR_CONFIG = "state.dir";
private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem."; private static final String STATE_DIR_DOC = "Directory location for state store. This path must be unique for each streams instance sharing the same underlying filesystem.";
/** {@code task.timeout.ms} */
public static final String TASK_TIMEOUT_MS_CONFIG = "task.timeout.ms";
public static final String TASK_TIMEOUT_MS_DOC = "Max amount of time a task might stall due to internal errors and retries until an error is raised. " +
"For a timeout of 0ms, a task would raise an error for the first internal error. " +
"For any timeout larger than 0ms, a task will retry at least once before an error is raised.";
/** {@code topology.optimization} */ /** {@code topology.optimization} */
public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization"; public static final String TOPOLOGY_OPTIMIZATION_CONFIG = "topology.optimization";
private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default"; private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration telling Kafka Streams if it should optimize the topology, disabled by default";
@ -677,6 +691,12 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
Importance.MEDIUM, Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC) CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
Type.LONG,
Duration.ofSeconds(5L).toMillis(),
atLeast(0L),
Importance.MEDIUM,
TASK_TIMEOUT_MS_DOC)
.define(TOPOLOGY_OPTIMIZATION_CONFIG, .define(TOPOLOGY_OPTIMIZATION_CONFIG,
Type.STRING, Type.STRING,
NO_OPTIMIZATION, NO_OPTIMIZATION,
@ -1000,6 +1020,9 @@ public class StreamsConfig extends AbstractConfig {
if (props.containsKey(PARTITION_GROUPER_CLASS_CONFIG)) { if (props.containsKey(PARTITION_GROUPER_CLASS_CONFIG)) {
log.warn("Configuration parameter `{}` is deprecated and will be removed in 3.0.0 release.", PARTITION_GROUPER_CLASS_CONFIG); log.warn("Configuration parameter `{}` is deprecated and will be removed in 3.0.0 release.", PARTITION_GROUPER_CLASS_CONFIG);
} }
if (props.containsKey(RETRIES_CONFIG)) {
log.warn("Configuration parameter `{}` is deprecated and will be removed in 3.0.0 release.", RETRIES_CONFIG);
}
} }
@Override @Override

View File

@ -75,6 +75,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private final Map<String, String> storeToChangelogTopic; private final Map<String, String> storeToChangelogTopic;
private final List<StateStore> globalStateStores; private final List<StateStore> globalStateStores;
@SuppressWarnings("deprecation") // TODO: remove in follow up PR when `RETRIES` is removed
public GlobalStateManagerImpl(final LogContext logContext, public GlobalStateManagerImpl(final LogContext logContext,
final ProcessorTopology topology, final ProcessorTopology topology,
final Consumer<byte[], byte[]> globalConsumer, final Consumer<byte[], byte[]> globalConsumer,

View File

@ -55,6 +55,7 @@ public class InternalTopicManager {
private final int retries; private final int retries;
private final long retryBackOffMs; private final long retryBackOffMs;
@SuppressWarnings("deprecation") // TODO: remove in follow up PR when `RETRIES` is removed
public InternalTopicManager(final Admin adminClient, final StreamsConfig streamsConfig) { public InternalTopicManager(final Admin adminClient, final StreamsConfig streamsConfig) {
this.adminClient = adminClient; this.adminClient = adminClient;

View File

@ -159,6 +159,7 @@ public class StreamsConfigTest {
assertNull(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); assertNull(returnedProps.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
} }
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
@Test @Test
public void consumerConfigMustContainStreamPartitionAssignorConfig() { public void consumerConfigMustContainStreamPartitionAssignorConfig() {
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 42); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 42);
@ -190,6 +191,7 @@ public class StreamsConfigTest {
assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG))); assertEquals(100, returnedProps.get(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG)));
} }
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
@Test @Test
public void consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix() { public void consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix() {
props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 20); props.put(StreamsConfig.adminClientPrefix(StreamsConfig.RETRIES_CONFIG), 20);
@ -372,10 +374,10 @@ public class StreamsConfigTest {
@Test @Test
public void shouldSupportNonPrefixedAdminConfigs() { public void shouldSupportNonPrefixedAdminConfigs() {
props.put(AdminClientConfig.RETRIES_CONFIG, 10); props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10);
final StreamsConfig streamsConfig = new StreamsConfig(props); final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map<String, Object> configs = streamsConfig.getAdminConfigs(clientId); final Map<String, Object> configs = streamsConfig.getAdminConfigs(clientId);
assertEquals(10, configs.get(AdminClientConfig.RETRIES_CONFIG)); assertEquals(10, configs.get(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
} }
@Test(expected = StreamsException.class) @Test(expected = StreamsException.class)
@ -710,18 +712,21 @@ public class StreamsConfigTest {
assertThat(producerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), is(nullValue())); assertThat(producerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), is(nullValue()));
} }
@Deprecated
@Test @Test
public void shouldNotOverrideUserConfigRetriesIfExactlyAlphaOnceEnabled() { public void shouldNotOverrideUserConfigRetriesIfExactlyAlphaOnceEnabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled(); shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
} }
@Deprecated
@Test @Test
public void shouldNotOverrideUserConfigRetriesIfExactlyBetaOnceEnabled() { public void shouldNotOverrideUserConfigRetriesIfExactlyBetaOnceEnabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA); props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled(); shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
} }
@Deprecated
private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() { private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
final int numberOfRetries = 42; final int numberOfRetries = 42;
props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries); props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
@ -934,6 +939,23 @@ public class StreamsConfigTest {
} }
} }
@SuppressWarnings("deprecation")
@Test
public void shouldLogWarningWhenRetriesIsUsed() {
props.put(StreamsConfig.RETRIES_CONFIG, 0);
LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
new StreamsConfig(props);
assertThat(
appender.getMessages(),
hasItem("Configuration parameter `" + StreamsConfig.RETRIES_CONFIG +
"` is deprecated and will be removed in 3.0.0 release.")
);
}
}
@Test @Test
public void shouldSetDefaultAcceptableRecoveryLag() { public void shouldSetDefaultAcceptableRecoveryLag() {
final StreamsConfig config = new StreamsConfig(props); final StreamsConfig config = new StreamsConfig(props);

View File

@ -131,7 +131,6 @@ public abstract class AbstractResetIntegrationTest {
producerConfig = new Properties(); producerConfig = new Properties();
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.putAll(commonClientConfig); producerConfig.putAll(commonClientConfig);
@ -188,7 +187,7 @@ public abstract class AbstractResetIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
} }
private void add10InputElements() throws java.util.concurrent.ExecutionException, InterruptedException { private void add10InputElements() {
final List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"), final List<KeyValue<Long, String>> records = Arrays.asList(KeyValue.pair(0L, "aaa"),
KeyValue.pair(1L, "bbb"), KeyValue.pair(1L, "bbb"),
KeyValue.pair(0L, "ccc"), KeyValue.pair(0L, "ccc"),
@ -280,7 +279,7 @@ public abstract class AbstractResetIntegrationTest {
// Reset would fail since long session timeout has been configured // Reset would fail since long session timeout has been configured
final boolean cleanResult = tryCleanGlobal(false, null, null); final boolean cleanResult = tryCleanGlobal(false, null, null);
Assert.assertEquals(false, cleanResult); Assert.assertFalse(cleanResult);
// Reset will success with --force, it will force delete active members on broker side // Reset will success with --force, it will force delete active members on broker side
cleanGlobal(false, "--force", null); cleanGlobal(false, "--force", null);
@ -604,7 +603,7 @@ public abstract class AbstractResetIntegrationTest {
final String resetScenario, final String resetScenario,
final String resetScenarioArg) throws Exception { final String resetScenarioArg) throws Exception {
final boolean cleanResult = tryCleanGlobal(withIntermediateTopics, resetScenario, resetScenarioArg); final boolean cleanResult = tryCleanGlobal(withIntermediateTopics, resetScenario, resetScenarioArg);
Assert.assertEquals(true, cleanResult); Assert.assertTrue(cleanResult);
} }
private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception { private void assertInternalTopicsGotDeleted(final String intermediateUserTopic) throws Exception {

View File

@ -101,11 +101,10 @@ public class InternalTopicIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsProp); IntegrationTestUtils.purgeLocalStreamsState(streamsProp);
} }
private void produceData(final List<String> inputValues) throws Exception { private void produceData(final List<String> inputValues) {
final Properties producerProp = new Properties(); final Properties producerProp = new Properties();
producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); producerProp.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerProp.put(ProducerConfig.ACKS_CONFIG, "all"); producerProp.put(ProducerConfig.ACKS_CONFIG, "all");
producerProp.put(ProducerConfig.RETRIES_CONFIG, 0);
producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerProp.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerProp.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@ -137,7 +136,7 @@ public class InternalTopicIntegrationTest {
} }
@Test @Test
public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception { public void shouldCompactTopicsForKeyValueStoreChangelogs() {
final String appID = APP_ID + "-compact"; final String appID = APP_ID + "-compact";
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
@ -174,7 +173,7 @@ public class InternalTopicIntegrationTest {
} }
@Test @Test
public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception { public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() {
final String appID = APP_ID + "-compact-delete"; final String appID = APP_ID + "-compact-delete";
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);

View File

@ -51,6 +51,7 @@ import org.junit.experimental.categories.Category;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
@ -91,19 +92,16 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG_1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG_1.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_1.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG_1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); PRODUCER_CONFIG_1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
PRODUCER_CONFIG_1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, FloatSerializer.class); PRODUCER_CONFIG_1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, FloatSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG_2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_2.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG_2.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_2.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG_2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); PRODUCER_CONFIG_2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
PRODUCER_CONFIG_2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class); PRODUCER_CONFIG_2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); PRODUCER_CONFIG_3.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
PRODUCER_CONFIG_3.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG_3.put(ProducerConfig.ACKS_CONFIG, "all");
PRODUCER_CONFIG_3.put(ProducerConfig.RETRIES_CONFIG, 0);
PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); PRODUCER_CONFIG_3.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); PRODUCER_CONFIG_3.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
@ -136,8 +134,8 @@ public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
); );
//Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised. //Partitions pre-computed using the default Murmur2 hash, just to ensure that all 3 partitions will be exercised.
final List<KeyValue<Integer, String>> table3 = Arrays.asList( final List<KeyValue<Integer, String>> table3 = Collections.singletonList(
new KeyValue<>(10, "waffle") new KeyValue<>(10, "waffle")
); );
IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, MOCK_TIME); IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, PRODUCER_CONFIG_1, MOCK_TIME);

View File

@ -388,7 +388,7 @@ public class GlobalStateManagerImplTest {
stateManager.initialize(); stateManager.initialize();
// register the stores // register the stores
initializeConsumer(1, 0, t1); initializeConsumer(1, 0, t1);
stateManager.registerStore(new NoOpReadOnlyStore(store1.name()) { stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>(store1.name()) {
@Override @Override
public void flush() { public void flush() {
throw new RuntimeException("KABOOM!"); throw new RuntimeException("KABOOM!");
@ -416,7 +416,7 @@ public class GlobalStateManagerImplTest {
public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException { public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws IOException {
stateManager.initialize(); stateManager.initialize();
initializeConsumer(1, 0, t1); initializeConsumer(1, 0, t1);
stateManager.registerStore(new NoOpReadOnlyStore(store1.name()) { stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>(store1.name()) {
@Override @Override
public void close() { public void close() {
throw new RuntimeException("KABOOM!"); throw new RuntimeException("KABOOM!");
@ -454,7 +454,7 @@ public class GlobalStateManagerImplTest {
public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException { public void shouldNotCloseStoresIfCloseAlreadyCalled() throws IOException {
stateManager.initialize(); stateManager.initialize();
initializeConsumer(1, 0, t1); initializeConsumer(1, 0, t1);
stateManager.registerStore(new NoOpReadOnlyStore("t1-store") { stateManager.registerStore(new NoOpReadOnlyStore<Object, Object>("t1-store") {
@Override @Override
public void close() { public void close() {
if (!isOpen()) { if (!isOpen()) {
@ -472,7 +472,7 @@ public class GlobalStateManagerImplTest {
public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException { public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws IOException {
stateManager.initialize(); stateManager.initialize();
initializeConsumer(1, 0, t1); initializeConsumer(1, 0, t1);
final NoOpReadOnlyStore store = new NoOpReadOnlyStore("t1-store") { final NoOpReadOnlyStore<Object, Object> store = new NoOpReadOnlyStore<Object, Object>("t1-store") {
@Override @Override
public void close() { public void close() {
super.close(); super.close();
@ -612,6 +612,7 @@ public class GlobalStateManagerImplTest {
} }
} }
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
@Test @Test
public void shouldRetryWhenEndOffsetsThrowsTimeoutException() { public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
final int retries = 2; final int retries = 2;
@ -645,6 +646,7 @@ public class GlobalStateManagerImplTest {
} }
} }
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
@Test @Test
public void shouldRetryWhenPartitionsForThrowsTimeoutException() { public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
final int retries = 2; final int retries = 2;
@ -716,7 +718,7 @@ public class GlobalStateManagerImplTest {
} }
} }
private class ConverterStore<K, V> extends NoOpReadOnlyStore<K, V> implements TimestampedBytesStore { private static class ConverterStore<K, V> extends NoOpReadOnlyStore<K, V> implements TimestampedBytesStore {
ConverterStore(final String name, ConverterStore(final String name,
final boolean rocksdbStore) { final boolean rocksdbStore) {
super(name, rocksdbStore); super(name, rocksdbStore);

View File

@ -78,6 +78,7 @@ public class InternalTopicManagerTest {
private MockAdminClient mockAdminClient; private MockAdminClient mockAdminClient;
private InternalTopicManager internalTopicManager; private InternalTopicManager internalTopicManager;
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
private final Map<String, Object> config = new HashMap<String, Object>() { private final Map<String, Object> config = new HashMap<String, Object>() {
{ {
put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id"); put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");

View File

@ -17,20 +17,20 @@
package org.apache.kafka.streams.tests; package org.apache.kafka.streams.tests;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.ForeachAction;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -45,6 +45,7 @@ public class StreamsBrokerDownResilienceTest {
private static final String SINK_TOPIC = "streamsResilienceSink"; private static final String SINK_TOPIC = "streamsResilienceSink";
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
public static void main(final String[] args) throws IOException { public static void main(final String[] args) throws IOException {
if (args.length < 2) { if (args.length < 2) {
System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter"); System.err.println("StreamsBrokerDownResilienceTest are expecting two parameters: propFile, additionalConfigs; but only see " + args.length + " parameter");
@ -122,6 +123,7 @@ public class StreamsBrokerDownResilienceTest {
}); });
} }
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
private static boolean confirmCorrectConfigs(final Properties properties) { private static boolean confirmCorrectConfigs(final Properties properties) {
return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) && return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) &&
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) && properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) &&

View File

@ -49,7 +49,7 @@ import static java.time.Duration.ofMillis;
public class StreamsOptimizedTest { public class StreamsOptimizedTest {
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
if (args.length < 1) { if (args.length < 1) {
System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: "); System.err.println("StreamsOptimizedTest requires one argument (properties-file) but no provided: ");

View File

@ -43,6 +43,7 @@ import java.util.Set;
public class StreamsStandByReplicaTest { public class StreamsStandByReplicaTest {
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
public static void main(final String[] args) throws IOException { public static void main(final String[] args) throws IOException {
if (args.length < 2) { if (args.length < 2) {
System.err.println("StreamsStandByReplicaTest are expecting two parameters: " + System.err.println("StreamsStandByReplicaTest are expecting two parameters: " +
@ -158,6 +159,7 @@ public class StreamsStandByReplicaTest {
streams.close(Duration.ofSeconds(10)); streams.close(Duration.ofSeconds(10));
} }
@SuppressWarnings("deprecation") // TODO revisit in follow up PR
private static boolean confirmCorrectConfigs(final Properties properties) { private static boolean confirmCorrectConfigs(final Properties properties) {
return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) && return properties.containsKey(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) &&
properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) && properties.containsKey(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG)) &&

View File

@ -188,7 +188,6 @@ class WordCountTest extends WordCountTestData {
val p = new Properties() val p = new Properties()
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
p.put(ProducerConfig.ACKS_CONFIG, "all") p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.RETRIES_CONFIG, "0")
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p p

View File

@ -73,7 +73,6 @@ class StreamToTableJoinScalaIntegrationTestBase extends StreamToTableJoinTestDat
val p = new Properties() val p = new Properties()
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
p.put(ProducerConfig.ACKS_CONFIG, "all") p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.RETRIES_CONFIG, "0")
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p p
@ -83,7 +82,6 @@ class StreamToTableJoinScalaIntegrationTestBase extends StreamToTableJoinTestDat
val p = new Properties() val p = new Properties()
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
p.put(ProducerConfig.ACKS_CONFIG, "all") p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.RETRIES_CONFIG, "0")
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer]) p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer])
p p

View File

@ -217,6 +217,7 @@ public class VerifiableProducer implements AutoCloseable {
} }
/** Construct a VerifiableProducer object from command-line arguments. */ /** Construct a VerifiableProducer object from command-line arguments. */
@SuppressWarnings("deprecation")
public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException { public static VerifiableProducer createFromArgs(ArgumentParser parser, String[] args) throws ArgumentParserException {
Namespace res = parser.parseArgs(args); Namespace res = parser.parseArgs(args);