consumerRecord : records) {
- Header header = consumerRecord.headers().lastHeader("headerKey");
- if (header != null)
- assertEquals("headerValue", new String(header.value()));
- }
+ Header header = records.get(0).headers().lastHeader("headerKey");
+ assertEquals("headerValue", new String(header.value()));
+
+ // Test the order of headers in a record is preserved when producing and consuming
+ Header[] headers = records.get(0).headers().toArray();
+ assertEquals("headerKey", headers[0].key());
+ assertEquals("headerKey2", headers[1].key());
+ assertEquals("headerKey3", headers[2].key());
+
verifyShareGroupStateTopicRecordsProduced();
}
}
diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
index f79b3786253..a9489c88327 100644
--- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
+++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerIdExpirationTest.java
@@ -204,10 +204,10 @@ public class ProducerIdExpirationTest {
// Update the producer ID expiration ms to a very high value.
admin.incrementalAlterConfigs(producerIdExpirationConfig("100000"));
- cluster.brokers().values().forEach(broker -> {
+ cluster.brokers().values().forEach(broker ->
TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100000,
- () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100);
- });
+ () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100)
+ );
// Send more records to send producer ID back to brokers.
producer.send(new ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes()));
producer.flush();
@@ -226,10 +226,10 @@ public class ProducerIdExpirationTest {
kafkaBroker.awaitShutdown();
kafkaBroker.startup();
cluster.waitForReadyBrokers();
- cluster.brokers().values().forEach(broker -> {
+ cluster.brokers().values().forEach(broker ->
TestUtils.waitUntilTrue(() -> broker.logManager().producerStateManagerConfig().producerIdExpirationMs() == 100,
- () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100);
- });
+ () -> "Configuration was not updated.", DEFAULT_MAX_WAIT_MS, 100)
+ );
// Ensure producer ID expires quickly again.
waitProducerIdExpire(admin);
diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
index 4301eecbb9c..aa93431cf63 100644
--- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
+++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
@@ -184,9 +184,8 @@ public class ProducerSendWhileDeletionTest {
try (var producer = createProducer()) {
for (int i = 1; i <= numRecords; i++) {
producer.send(new ProducerRecord<>(topic, null, ("value" + i).getBytes()),
- (metadata, exception) -> {
- numAcks.incrementAndGet();
- });
+ (metadata, exception) -> numAcks.incrementAndGet()
+ );
}
producer.flush();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
index e28a03d541c..81e889db30d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java
@@ -17,11 +17,20 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.protocol.Errors;
import java.util.Optional;
/**
* Options for {@link Admin#addRaftVoter}.
+ *
+ *
+ * The clusterId is optional.
+ *
+ * If provided, the request will only succeed if the cluster id matches the id of the current cluster.
+ * If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}.
+ *
+ * If not provided, the cluster id check is skipped.
*/
@InterfaceStability.Stable
public class AddRaftVoterOptions extends AbstractOptions {
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index e596754f62f..06ede9f620d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -1866,10 +1867,17 @@ public interface Admin extends AutoCloseable {
/**
* Add a new voter node to the KRaft metadata quorum.
*
+ *
+ * The clusterId in {@link AddRaftVoterOptions} is optional.
+ * If provided, the operation will only succeed if the cluster id matches the id
+ * of the current cluster. If the cluster id does not match, the operation
+ * will fail with {@link InconsistentClusterIdException}.
+ * If not provided, the cluster id check is skipped.
+ *
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
* @param endpoints The endpoints that the new voter has.
- * @param options The options to use when adding the new voter node.
+ * @param options Additional options for the operation, including optional cluster ID.
*/
AddRaftVoterResult addRaftVoter(
int voterId,
@@ -1894,9 +1902,16 @@ public interface Admin extends AutoCloseable {
/**
* Remove a voter node from the KRaft metadata quorum.
*
+ *
+ * The clusterId in {@link AddRaftVoterOptions} is optional.
+ * If provided, the operation will only succeed if the cluster id matches the id
+ * of the current cluster. If the cluster id does not match, the operation
+ * will fail with {@link InconsistentClusterIdException}.
+ * If not provided, the cluster id check is skipped.
+ *
* @param voterId The node ID of the voter.
* @param voterDirectoryId The directory ID of the voter.
- * @param options The options to use when removing the voter node.
+ * @param options Additional options for the operation, including optional cluster ID.
*/
RemoveRaftVoterResult removeRaftVoter(
int voterId,
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
index a41ec6d00b3..80fd55c7323 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
- * Options for the {@link Admin#deleteShareGroups(Collection , DeleteShareGroupsOptions)} call.
+ * Options for the {@link Admin#deleteShareGroups(Collection, DeleteShareGroupsOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
index c2791e681f7..ff53da08df8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.Map;
/**
- * The result of the {@link Admin#deleteShareGroups(Collection , DeleteShareGroupsOptions)} call.
+ * The result of the {@link Admin#deleteShareGroups(Collection, DeleteShareGroupsOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java
index 6cd14797122..6ca2ec66a27 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
/**
- * Options for the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
+ * Options for the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 90f83eac935..78a7f905319 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2494,8 +2494,7 @@ public class KafkaAdminClient extends AdminClient {
DescribeClusterResponse response = (DescribeClusterResponse) abstractResponse;
Errors error = Errors.forCode(response.data().errorCode());
if (error != Errors.NONE) {
- ApiError apiError = new ApiError(error, response.data().errorMessage());
- handleFailure(apiError.exception());
+ handleFailure(error.exception(response.data().errorMessage()));
return;
}
@@ -2691,10 +2690,9 @@ public class KafkaAdminClient extends AdminClient {
} else {
List filterResults = new ArrayList<>();
for (DeleteAclsMatchingAcl matchingAcl : filterResult.matchingAcls()) {
- ApiError aclError = new ApiError(Errors.forCode(matchingAcl.errorCode()),
- matchingAcl.errorMessage());
+ Errors aclError = Errors.forCode(matchingAcl.errorCode());
AclBinding aclBinding = DeleteAclsResponse.aclBinding(matchingAcl);
- filterResults.add(new FilterResult(aclBinding, aclError.exception()));
+ filterResults.add(new FilterResult(aclBinding, aclError.exception(matchingAcl.errorMessage())));
}
future.complete(new FilterResults(filterResults));
}
@@ -3995,7 +3993,7 @@ public class KafkaAdminClient extends AdminClient {
for (ReassignablePartitionResponse partition : topicResponse.partitions()) {
errors.put(
new TopicPartition(topicName, partition.partitionIndex()),
- new ApiError(topLevelError, response.data().errorMessage()).exception()
+ topLevelError.exception(response.data().errorMessage())
);
receivedResponsesCount += 1;
}
@@ -4035,7 +4033,7 @@ public class KafkaAdminClient extends AdminClient {
if (partitionError == Errors.NONE) {
errors.put(tp, null);
} else {
- errors.put(tp, new ApiError(partitionError, partResponse.errorMessage()).exception());
+ errors.put(tp, partitionError.exception(partResponse.errorMessage()));
}
receivedResponsesCount += 1;
}
@@ -4111,7 +4109,7 @@ public class KafkaAdminClient extends AdminClient {
handleNotControllerError(error);
break;
default:
- partitionReassignmentsFuture.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception());
+ partitionReassignmentsFuture.completeExceptionally(error.exception(response.data().errorMessage()));
break;
}
Map reassignmentMap = new HashMap<>();
@@ -4993,14 +4991,11 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse response) {
handleNotControllerError(response);
AddRaftVoterResponse addResponse = (AddRaftVoterResponse) response;
- if (addResponse.data().errorCode() != Errors.NONE.code()) {
- ApiError error = new ApiError(
- addResponse.data().errorCode(),
- addResponse.data().errorMessage());
- future.completeExceptionally(error.exception());
- } else {
+ Errors error = Errors.forCode(addResponse.data().errorCode());
+ if (error != Errors.NONE)
+ future.completeExceptionally(error.exception(addResponse.data().errorMessage()));
+ else
future.complete(null);
- }
}
@Override
@@ -5038,14 +5033,11 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse response) {
handleNotControllerError(response);
RemoveRaftVoterResponse addResponse = (RemoveRaftVoterResponse) response;
- if (addResponse.data().errorCode() != Errors.NONE.code()) {
- ApiError error = new ApiError(
- addResponse.data().errorCode(),
- addResponse.data().errorMessage());
- future.completeExceptionally(error.exception());
- } else {
+ Errors error = Errors.forCode(addResponse.data().errorCode());
+ if (error != Errors.NONE)
+ future.completeExceptionally(error.exception(addResponse.data().errorMessage()));
+ else
future.complete(null);
- }
}
@Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
index 6fecddc69ef..1a2c8869c6c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java
@@ -46,7 +46,7 @@ public class ListShareGroupOffsetsResult {
/**
* Return the future when the requests for all groups succeed.
*
- * @return Future which yields all {@code Map>} objects, if requests for all the groups succeed.
+ * @return Future which yields all {@code Map>} objects, if requests for all the groups succeed.
*/
public KafkaFuture