KAFKA-19397: Ensure consistent metadata usage in produce request and response (#19964)
CI / build (push) Waiting to run Details

- Metadata doesn't have the full view of topicNames to ids during
rebootstrap of client or when topic has been deleted/recreated. The
solution is to pass down topic id and stop trying to figure it out later
in the logic.

---------

Co-authored-by: Kirk True <kirk@kirktrue.pro>
This commit is contained in:
Omnia Ibrahim 2025-07-04 17:44:09 +01:00 committed by Chia-Ping Tsai
parent f14e60fc8f
commit e6b78ae9e5
2 changed files with 59 additions and 5 deletions

View File

@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
@ -42,9 +43,12 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@ -157,6 +161,42 @@ public class ProducerSendWhileDeletionTest {
}
}
@ClusterTest
public void testSendWhileTopicGetRecreated() {
int maxNumTopicRecreationAttempts = 5;
var recreateTopicFuture = CompletableFuture.supplyAsync(() -> {
var topicIds = new HashSet<Uuid>();
while (topicIds.size() < maxNumTopicRecreationAttempts) {
try (var admin = cluster.admin()) {
if (admin.listTopics().names().get().contains(topic)) {
admin.deleteTopics(List.of(topic)).all().get();
}
topicIds.add(admin.createTopics(List.of(new NewTopic(topic, 2, (short) 1))).topicId(topic).get());
} catch (Exception e) {
// ignore
}
}
return topicIds;
});
AtomicInteger numAcks = new AtomicInteger(0);
var producerFuture = CompletableFuture.runAsync(() -> {
try (var producer = createProducer()) {
for (int i = 1; i <= numRecords; i++) {
producer.send(new ProducerRecord<>(topic, null, ("value" + i).getBytes()),
(metadata, exception) -> {
numAcks.incrementAndGet();
});
}
producer.flush();
}
});
var topicIds = recreateTopicFuture.join();
producerFuture.join();
assertEquals(maxNumTopicRecreationAttempts, topicIds.size());
assertEquals(numRecords, numAcks.intValue());
}
@ClusterTest
public void testSendWithTopicReassignmentIsMidWay() throws Exception {
var partition0 = new TopicPartition(topic, 0);

View File

@ -565,7 +565,7 @@ public class Sender implements Runnable {
/**
* Handle a produce response
*/
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, Map<Uuid, String> topicNames, long now) {
RequestHeader requestHeader = response.requestHeader();
int correlationId = requestHeader.correlationId();
if (response.wasTimedOut()) {
@ -595,9 +595,6 @@ public class Sender implements Runnable {
// This will be set by completeBatch.
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> {
// Version 13 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name.
String topicName = metadata.topicNames().getOrDefault(r.topicId(), r.name());
TopicPartition tp = new TopicPartition(topicName, p.index());
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse(
Errors.forCode(p.errorCode()),
p.baseOffset(),
@ -609,7 +606,20 @@ public class Sender implements Runnable {
.collect(Collectors.toList()),
p.errorMessage(),
p.currentLeader());
// Version 13 drop topic name and add support to topic id.
// We need to find batch based on topic id and partition index only as
// topic name in the response will be empty.
// For older versions, topic id is zero, and we will find the batch based on the topic name.
TopicPartition tp = (!r.topicId().equals(Uuid.ZERO_UUID) && topicNames.containsKey(r.topicId())) ?
new TopicPartition(topicNames.get(r.topicId()), p.index()) :
new TopicPartition(r.name(), p.index());
ProducerBatch batch = batches.get(tp);
if (batch == null) {
throw new IllegalStateException("Can't find batch created for topic id " + r.topicId() +
" topic name " + r.name() + " partition " + p.index() + " using " + topicNames);
}
completeBatch(batch, partResp, correlationId, now, partitionsWithUpdatedLeaderInfo);
}));
@ -892,7 +902,11 @@ public class Sender implements Runnable {
.setTopicData(tpd),
useTransactionV1Version
);
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
// Fetch topic names from metadata outside callback as topic ids may change during the callback
// for example if topic was recreated.
Map<Uuid, String> topicNames = metadata.topicNames();
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, topicNames, time.milliseconds());
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,