diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 9b537f79305..0eab4a7a5f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -117,7 +117,7 @@ import java.util.Set; * *
- * The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker + * The minimum broker version required is 2.1. Methods with stricter requirements will specify the minimum broker * version required. *
*/ @@ -170,8 +170,6 @@ public interface Admin extends AutoCloseable { *
* This is a convenience method for {@link #createTopics(Collection, CreateTopicsOptions)} with default options. * See the overload for more details. - *
- * This operation is supported by brokers with version 0.10.1.0 or higher. * * @param newTopics The new topics to create. * @return The CreateTopicsResult. @@ -189,9 +187,6 @@ public interface Admin extends AutoCloseable { * success for all the brokers to become aware that the topics have been created. * During this time, {@link #listTopics()} and {@link #describeTopics(Collection)} * may not return information about the new topics. - *
- * This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported - * from version 0.10.2.0. * * @param newTopics The new topics to create. * @param options The options to use when creating the new topics. @@ -202,8 +197,6 @@ public interface Admin extends AutoCloseable { /** * This is a convenience method for {@link #deleteTopics(TopicCollection, DeleteTopicsOptions)} * with default options. See the overload for more details. - *
- * This operation is supported by brokers with version 0.10.1.0 or higher. * * @param topics The topic names to delete. * @return The DeleteTopicsResult. @@ -215,8 +208,6 @@ public interface Admin extends AutoCloseable { /** * This is a convenience method for {@link #deleteTopics(TopicCollection, DeleteTopicsOptions)} * with default options. See the overload for more details. - *
- * This operation is supported by brokers with version 0.10.1.0 or higher. * * @param topics The topic names to delete. * @param options The options to use when deleting the topics. @@ -231,7 +222,6 @@ public interface Admin extends AutoCloseable { * with default options. See the overload for more details. *
* When using topic IDs, this operation is supported by brokers with inter-broker protocol 2.8 or higher. - * When using topic names, this operation is supported by brokers with version 0.10.1.0 or higher. * * @param topics The topics to delete. * @return The DeleteTopicsResult. @@ -255,7 +245,6 @@ public interface Admin extends AutoCloseable { * return successfully in this case. *
* When using topic IDs, this operation is supported by brokers with inter-broker protocol 2.8 or higher. - * When using topic names, this operation is supported by brokers with version 0.10.1.0 or higher. * * @param topics The topics to delete. * @param options The options to use when deleting the topics. @@ -354,8 +343,6 @@ public interface Admin extends AutoCloseable { /** * This is a convenience method for {@link #describeAcls(AclBindingFilter, DescribeAclsOptions)} with * default options. See the overload for more details. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param filter The filter to use. * @return The DescribeAclsResult. @@ -369,8 +356,6 @@ public interface Admin extends AutoCloseable { *
* Note: it may take some time for changes made by {@code createAcls} or {@code deleteAcls} to be reflected * in the output of {@code describeAcls}. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param filter The filter to use. * @param options The options to use when listing the ACLs. @@ -381,8 +366,6 @@ public interface Admin extends AutoCloseable { /** * This is a convenience method for {@link #createAcls(Collection, CreateAclsOptions)} with * default options. See the overload for more details. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param acls The ACLs to create * @return The CreateAclsResult. @@ -398,8 +381,6 @@ public interface Admin extends AutoCloseable { *
* If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but * no changes will be made. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param acls The ACLs to create * @param options The options to use when creating the ACLs. @@ -410,8 +391,6 @@ public interface Admin extends AutoCloseable { /** * This is a convenience method for {@link #deleteAcls(Collection, DeleteAclsOptions)} with default options. * See the overload for more details. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param filters The filters to use. * @return The DeleteAclsResult. @@ -424,8 +403,6 @@ public interface Admin extends AutoCloseable { * Deletes access control lists (ACLs) according to the supplied filters. *
* This operation is not transactional so it may succeed for some ACLs while fail for others. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param filters The filters to use. * @param options The options to use when deleting the ACLs. @@ -439,8 +416,6 @@ public interface Admin extends AutoCloseable { *
* This is a convenience method for {@link #describeConfigs(Collection, DescribeConfigsOptions)} with default options. * See the overload for more details. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param resources See relevant type {@link ConfigResource.Type} * @return The DescribeConfigsResult @@ -472,8 +447,6 @@ public interface Admin extends AutoCloseable { * will throw a {@link org.apache.kafka.common.errors.TimeoutException} exception *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param resources See relevant type {@link ConfigResource.Type} * @param options The options to use when describing configs @@ -534,8 +507,6 @@ public interface Admin extends AutoCloseable { *
* This is a convenience method for {@link #alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options. * See the overload for more details. - *
- * This operation is supported by brokers with version 1.1.0 or higher. * * @param replicaAssignment The replicas with their log directory absolute path * @return The AlterReplicaLogDirsResult @@ -551,8 +522,6 @@ public interface Admin extends AutoCloseable { * log directory if it is not already there. For detailed result, inspect the returned {@link AlterReplicaLogDirsResult} instance. *
* This operation is not transactional so it may succeed for some replicas while fail for others. - *
- * This operation is supported by brokers with version 1.1.0 or higher. * * @param replicaAssignment The replicas with their log directory absolute path * @param options The options to use when changing replica dir @@ -566,8 +535,6 @@ public interface Admin extends AutoCloseable { *
* This is a convenience method for {@link #describeLogDirs(Collection, DescribeLogDirsOptions)} with default options. * See the overload for more details. - *
- * This operation is supported by brokers with version 1.0.0 or higher. * * @param brokers A list of brokers * @return The DescribeLogDirsResult @@ -578,8 +545,6 @@ public interface Admin extends AutoCloseable { /** * Query the information of all log directories on the given set of brokers - *
- * This operation is supported by brokers with version 1.0.0 or higher. * * @param brokers A list of brokers * @param options The options to use when querying log dir info @@ -592,8 +557,6 @@ public interface Admin extends AutoCloseable { *
* This is a convenience method for {@link #describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)} * with default options. See the overload for more details. - *
- * This operation is supported by brokers with version 1.0.0 or higher. * * @param replicas The replicas to query * @return The DescribeReplicaLogDirsResult @@ -604,8 +567,6 @@ public interface Admin extends AutoCloseable { /** * Query the replica log directory information for the specified replicas. - *
- * This operation is supported by brokers with version 1.0.0 or higher. * * @param replicas The replicas to query * @param options The options to use when querying replica log dir info @@ -641,8 +602,6 @@ public interface Admin extends AutoCloseable { * During this time, {@link #describeTopics(Collection)} * may not return information about the new partitions. *
- * This operation is supported by brokers with version 1.0.0 or higher. - *
* The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the * {@link CreatePartitionsResult#values() values()} method of the returned {@link CreatePartitionsResult} *
* This is a convenience method for {@link #deleteRecords(Map, DeleteRecordsOptions)} with default options. * See the overload for more details. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param recordsToDelete The topic partitions and related offsets from which records deletion starts. * @return The DeleteRecordsResult. @@ -686,8 +643,6 @@ public interface Admin extends AutoCloseable { /** * Delete records whose offset is smaller than the given offset of the corresponding partition. - *
- * This operation is supported by brokers with version 0.11.0.0 or higher. * * @param recordsToDelete The topic partitions and related offsets from which records deletion starts. * @param options The options to use when deleting records. @@ -712,8 +667,6 @@ public interface Admin extends AutoCloseable { /** * Create a Delegation Token. *
- * This operation is supported by brokers with version 1.1.0 or higher. - *
* The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@link CreateDelegationTokenResult} *
- * This operation is supported by brokers with version 1.1.0 or higher. - *
* The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@link RenewDelegationTokenResult} *
- * This operation is supported by brokers with version 1.1.0 or higher. - *
* The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the * {@link ExpireDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@link ExpireDelegationTokenResult} *
- * This operation is supported by brokers with version 1.1.0 or higher. - *
* The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the * {@link DescribeDelegationTokenResult#delegationTokens() delegationTokens()} method of the returned {@link DescribeDelegationTokenResult} *
- * This operation is supported by brokers with version 2.2.0 or later if preferred election is use; + * This operation is supported by brokers with version 2.2.0 or later if preferred election is used; * otherwise the brokers most be 2.4.0 or higher. *
* The following exceptions can be anticipated when calling {@code get()} on the future obtained diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 70c0f7cadd5..a64bdfb4019 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -61,9 +61,9 @@ import static org.apache.kafka.common.utils.Utils.propsToMap; * The consumer is not thread-safe. See Multi-threaded Processing for more details. * *
  *
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index aed5d75d701..9c726c977a9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -138,14 +138,14 @@ import java.util.concurrent.atomic.AtomicReference;
  * the batch.size config. Making this larger can result in more batching, but requires more memory (since we will
  * generally have one of these buffers for each active partition).
  * 
- * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you
- * want to reduce the number of requests you can set linger.ms to something greater than 0. This will
- * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
+ * By default, a buffer is available to send after a short delay even if there is additional unused space in the buffer.
+ * However, if you want to reduce the number of requests you can set linger.ms to something greater than {@code 5}.
+ * This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will
  * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above,
- * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting
- * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
+ * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. This setting
+ * would also add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that
  * records that arrive close together in time will generally batch together even with linger.ms=0. So, under heavy load,
- * batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more
+ * batching will occur regardless of the linger configuration; however setting this to something larger can lead to fewer, more
  * efficient requests when not under maximal load at the cost of a small amount of latency.
  * 
  * The buffer.memory controls the total amount of memory available to the producer for buffering. If records
@@ -157,24 +157,19 @@ import java.util.concurrent.atomic.AtomicReference;
  * their ProducerRecord into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or
  * {@link org.apache.kafka.common.serialization.StringSerializer} for simple byte or string types.
  * 
- * From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. + * The KafkaProducer supports two additional modes: the idempotent producer (enabled by default) and the transactional producer. * The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular * producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages * to multiple partitions (and topics!) atomically. *
*
- * From Kafka 3.0, the enable.idempotence configuration defaults to true. When enabling idempotence,
- * retries config will default to Integer.MAX_VALUE and the acks config will
- * default to all. There are no API changes for the idempotent producer, so existing applications will
- * not need to be modified to take advantage of this feature.
- * 
- * To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot
- * be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries
- * config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a {@link #send(ProducerRecord)}
- * returns an error even with infinite retries (for instance if the message expires in the buffer before being sent),
- * then it is recommended to shut down the producer and check the contents of the last produced message to ensure that
- * it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.
+ * To ensure idempotence, it is imperative to avoid application level re-sends since these cannot be de-duplicated.
+ * To achieve this, it is recommended to set {@code delivery.timeout.ms} such that retries are handled for the desired
+ * duration by the producer (the {@code retries} config should be left unset - the default is {@code Integer.MAX_VALUE}).
+ * Additionally, if a {@link #send(ProducerRecord)} returns an error even with infinite retries (for instance if the
+ * message expires in the buffer before being sent), then it is recommended to shut down the producer and check the
+ * contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee
+ * idempotence for messages sent within a single session.
  * 
To use the transactional producer and the attendant APIs, you must set the transactional.id
  * configuration property. If the transactional.id is set, idempotence is automatically enabled along with
@@ -234,9 +229,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * successful writes are marked as aborted, hence keeping the transactional guarantees.
  * 
- * This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support
- * certain client features.  For instance, the transactional APIs need broker versions 0.11.0 or later. You will receive an
- * UnsupportedVersionException when invoking an API that is not available in the running broker version.
+ * This client can communicate with brokers that are version 2.1 or newer. Older brokers may not support
+ * certain client features. For instance, {@code sendOffsetsToTransaction} with all consumer group metadata needs broker
+ * versions 2.5 or later. You will receive an UnsupportedVersionException when invoking an API that is not
+ * available in the running broker version.
  * 
      * This method is a blocking call that waits until the request has been received and acknowledged by the consumer group
@@ -710,8 +702,7 @@ public class KafkaProducer