mirror of https://github.com/apache/kafka.git
KAFKA-19397: Ensure consistent metadata usage in produce request and response (#19964)
- Metadata doesn't have the full view of topicNames to ids during rebootstrap of client or when topic has been deleted/recreated. The solution is to pass down topic id and stop trying to figure it out later in the logic. --------- Co-authored-by: Kirk True <kirk@kirktrue.pro>
This commit is contained in:
parent
da4fbba279
commit
9df616da76
|
@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
|
|||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.common.test.ClusterInstance;
|
||||
|
@ -42,9 +43,12 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
|
||||
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
|
||||
|
@ -157,6 +161,42 @@ public class ProducerSendWhileDeletionTest {
|
|||
}
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testSendWhileTopicGetRecreated() {
|
||||
int maxNumTopicRecreationAttempts = 5;
|
||||
var recreateTopicFuture = CompletableFuture.supplyAsync(() -> {
|
||||
var topicIds = new HashSet<Uuid>();
|
||||
while (topicIds.size() < maxNumTopicRecreationAttempts) {
|
||||
try (var admin = cluster.admin()) {
|
||||
if (admin.listTopics().names().get().contains(topic)) {
|
||||
admin.deleteTopics(List.of(topic)).all().get();
|
||||
}
|
||||
topicIds.add(admin.createTopics(List.of(new NewTopic(topic, 2, (short) 1))).topicId(topic).get());
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
return topicIds;
|
||||
});
|
||||
|
||||
AtomicInteger numAcks = new AtomicInteger(0);
|
||||
var producerFuture = CompletableFuture.runAsync(() -> {
|
||||
try (var producer = createProducer()) {
|
||||
for (int i = 1; i <= numRecords; i++) {
|
||||
producer.send(new ProducerRecord<>(topic, null, ("value" + i).getBytes()),
|
||||
(metadata, exception) -> {
|
||||
numAcks.incrementAndGet();
|
||||
});
|
||||
}
|
||||
producer.flush();
|
||||
}
|
||||
});
|
||||
var topicIds = recreateTopicFuture.join();
|
||||
producerFuture.join();
|
||||
assertEquals(maxNumTopicRecreationAttempts, topicIds.size());
|
||||
assertEquals(numRecords, numAcks.intValue());
|
||||
}
|
||||
|
||||
@ClusterTest
|
||||
public void testSendWithTopicReassignmentIsMidWay() throws Exception {
|
||||
var partition0 = new TopicPartition(topic, 0);
|
||||
|
|
|
@ -565,7 +565,7 @@ public class Sender implements Runnable {
|
|||
/**
|
||||
* Handle a produce response
|
||||
*/
|
||||
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
|
||||
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, Map<Uuid, String> topicNames, long now) {
|
||||
RequestHeader requestHeader = response.requestHeader();
|
||||
int correlationId = requestHeader.correlationId();
|
||||
if (response.wasTimedOut()) {
|
||||
|
@ -595,9 +595,6 @@ public class Sender implements Runnable {
|
|||
// This will be set by completeBatch.
|
||||
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
|
||||
produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> {
|
||||
// Version 13 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name.
|
||||
String topicName = metadata.topicNames().getOrDefault(r.topicId(), r.name());
|
||||
TopicPartition tp = new TopicPartition(topicName, p.index());
|
||||
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse(
|
||||
Errors.forCode(p.errorCode()),
|
||||
p.baseOffset(),
|
||||
|
@ -609,7 +606,20 @@ public class Sender implements Runnable {
|
|||
.collect(Collectors.toList()),
|
||||
p.errorMessage(),
|
||||
p.currentLeader());
|
||||
|
||||
// Version 13 drop topic name and add support to topic id.
|
||||
// We need to find batch based on topic id and partition index only as
|
||||
// topic name in the response will be empty.
|
||||
// For older versions, topic id is zero, and we will find the batch based on the topic name.
|
||||
TopicPartition tp = (!r.topicId().equals(Uuid.ZERO_UUID) && topicNames.containsKey(r.topicId())) ?
|
||||
new TopicPartition(topicNames.get(r.topicId()), p.index()) :
|
||||
new TopicPartition(r.name(), p.index());
|
||||
|
||||
ProducerBatch batch = batches.get(tp);
|
||||
if (batch == null) {
|
||||
throw new IllegalStateException("Can't find batch created for topic id " + r.topicId() +
|
||||
" topic name " + r.name() + " partition " + p.index() + " using " + topicNames);
|
||||
}
|
||||
completeBatch(batch, partResp, correlationId, now, partitionsWithUpdatedLeaderInfo);
|
||||
}));
|
||||
|
||||
|
@ -892,7 +902,11 @@ public class Sender implements Runnable {
|
|||
.setTopicData(tpd),
|
||||
useTransactionV1Version
|
||||
);
|
||||
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
|
||||
// Fetch topic names from metadata outside callback as topic ids may change during the callback
|
||||
// for example if topic was recreated.
|
||||
Map<Uuid, String> topicNames = metadata.topicNames();
|
||||
|
||||
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, topicNames, time.milliseconds());
|
||||
|
||||
String nodeId = Integer.toString(destination);
|
||||
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
|
||||
|
|
Loading…
Reference in New Issue