KAFKA-18632: Multibroker test improvements. (#18718)

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-01-29 22:33:43 +05:30 committed by GitHub
parent 048dfeffd0
commit 632aedcf4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 209 additions and 114 deletions

View File

@ -20,6 +20,7 @@ package kafka.server.metadata;
import kafka.server.MetadataCache;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
@ -27,6 +28,9 @@ import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@ -41,6 +45,7 @@ public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinator
private final MetadataCache metadataCache;
private final Function<SharePartitionKey, Integer> keyToPartitionMapper;
private final ListenerName interBrokerListenerName;
private final Logger log = LoggerFactory.getLogger(ShareCoordinatorMetadataCacheHelperImpl.class);
public ShareCoordinatorMetadataCacheHelperImpl(
MetadataCache metadataCache,
@ -63,35 +68,39 @@ public class ShareCoordinatorMetadataCacheHelperImpl implements ShareCoordinator
@Override
public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
if (metadataCache.contains(internalTopicName)) {
Set<String> topicSet = new HashSet<>();
topicSet.add(internalTopicName);
try {
if (metadataCache.contains(internalTopicName)) {
Set<String> topicSet = new HashSet<>();
topicSet.add(internalTopicName);
List<MetadataResponseData.MetadataResponseTopic> topicMetadata = CollectionConverters.asJava(
metadataCache.getTopicMetadata(
CollectionConverters.asScala(topicSet),
interBrokerListenerName,
false,
false
)
);
List<MetadataResponseData.MetadataResponseTopic> topicMetadata = CollectionConverters.asJava(
metadataCache.getTopicMetadata(
CollectionConverters.asScala(topicSet),
interBrokerListenerName,
false,
false
)
);
if (topicMetadata == null || topicMetadata.isEmpty() || topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
return Node.noNode();
} else {
int partition = keyToPartitionMapper.apply(key);
Optional<MetadataResponseData.MetadataResponsePartition> response = topicMetadata.get(0).partitions().stream()
.filter(responsePart -> responsePart.partitionIndex() == partition
&& responsePart.leaderId() != MetadataResponse.NO_LEADER_ID)
.findFirst();
if (response.isPresent()) {
return OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName))
.orElse(Node.noNode());
} else {
if (topicMetadata == null || topicMetadata.isEmpty() || topicMetadata.get(0).errorCode() != Errors.NONE.code()) {
return Node.noNode();
} else {
int partition = keyToPartitionMapper.apply(key);
Optional<MetadataResponseData.MetadataResponsePartition> response = topicMetadata.get(0).partitions().stream()
.filter(responsePart -> responsePart.partitionIndex() == partition
&& responsePart.leaderId() != MetadataResponse.NO_LEADER_ID)
.findFirst();
if (response.isPresent()) {
return OptionConverters.toJava(metadataCache.getAliveBrokerNode(response.get().leaderId(), interBrokerListenerName))
.orElse(Node.noNode());
} else {
return Node.noNode();
}
}
}
} catch (CoordinatorNotAvailableException e) {
log.warn("Coordinator not available", e);
}
return Node.noNode();
}

View File

@ -55,6 +55,8 @@ import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
@ -88,6 +90,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -1832,86 +1835,142 @@ public class ShareConsumerTest {
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
}
)
@Timeout(90)
public void testShareConsumerAfterCoordinatorMovement() throws Exception {
setup();
String topicName = "multipart";
String groupId = "multipartGrp";
Uuid topicId = createTopic(topicName, 3, 3);
alterShareAutoOffsetReset(groupId, "earliest");
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
try (Admin admin = createAdminClient()) {
TopicPartition tpMulti = new TopicPartition(topicName, 0);
TopicPartition tpMulti = new TopicPartition(topicName, 0);
// produce some messages
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
tpMulti.topic(),
tpMulti.partition(),
null,
"key".getBytes(),
"value".getBytes()
);
IntStream.range(0, 10).forEach(__ -> producer.send(record));
producer.flush();
// produce some messages
ClientState prodState = new ClientState();
final Set<String> produced = new HashSet<>();
service.execute(() -> {
int i = 0;
try (Producer<String, String> producer = createProducer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
))) {
while (!prodState.done().get()) {
String key = "key-" + (i++);
ProducerRecord<String, String> record = new ProducerRecord<>(
tpMulti.topic(),
tpMulti.partition(),
null,
key,
"value"
);
try {
producer.send(record);
producer.flush();
// count only correctly produced records
prodState.count().incrementAndGet();
produced.add(key);
} catch (Exception e) {
// ignore
}
}
}
}
);
// consume messages
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId)) {
shareConsumer.subscribe(List.of(topicName));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(10, records.count());
}
// consume messages - start after small delay
ClientState consState = new ClientState();
// using map here if we want to debug specific keys
Map<String, Integer> consumed = new HashMap<>();
service.schedule(() -> {
try (ShareConsumer<String, String> shareConsumer = createShareConsumer(groupId, Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName(),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()
))) {
shareConsumer.subscribe(List.of(topicName));
while (!consState.done().get()) {
ConsumerRecords<String, String> records = shareConsumer.poll(Duration.ofMillis(2000L));
consState.count().addAndGet(records.count());
records.forEach(rec -> consumed.compute(rec.key(), (k, v) -> v == null ? 1 : v + 1));
if (prodState.done().get() && records.count() == 0) {
consState.done().set(true);
}
}
}
}, 100L, TimeUnit.MILLISECONDS
);
// get current share coordinator node
SharePartitionKey key = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti));
int shareGroupStateTp = Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
List<Integer> curShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
.partitions().stream()
.filter(info -> info.partition() == shareGroupStateTp)
.map(info -> info.leader().id())
.toList();
// To be closer to real world scenarios, we will execute after
// some time has elapsed since the producer and consumer started
// working.
service.schedule(() -> {
// Get the current node hosting the __share_group_state partition
// on which tpMulti is hosted. Then shut down this node and wait
// for it to be gracefully shutdown. Then fetch the coordinator again
// and verify that it has moved to some other broker.
try (Admin admin = createAdminClient()) {
SharePartitionKey key = SharePartitionKey.getInstance(groupId, new TopicIdPartition(topicId, tpMulti));
int shareGroupStateTp = Utils.abs(key.asCoordinatorKey().hashCode()) % 3;
List<Integer> curShareCoordNodeId = null;
try {
curShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
.partitions().stream()
.filter(info -> info.partition() == shareGroupStateTp)
.map(info -> info.leader().id())
.toList();
} catch (Exception e) {
fail(e);
}
assertEquals(1, curShareCoordNodeId.size());
assertEquals(1, curShareCoordNodeId.size());
// shutdown the coordinator
KafkaBroker broker = cluster.brokers().get(curShareCoordNodeId.get(0));
cluster.shutdownBroker(curShareCoordNodeId.get(0));
// shutdown the coordinator
KafkaBroker broker = cluster.brokers().get(curShareCoordNodeId.get(0));
cluster.shutdownBroker(curShareCoordNodeId.get(0));
// wait for it to be completely shutdown
broker.awaitShutdown();
// give some breathing time
broker.awaitShutdown();
List<Integer> newShareCoordNodeId = null;
try {
newShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
.partitions().stream()
.filter(info -> info.partition() == shareGroupStateTp)
.map(info -> info.leader().id())
.toList();
} catch (Exception e) {
fail(e);
}
List<Integer> newShareCoordNodeId = admin.describeTopics(List.of(Topic.SHARE_GROUP_STATE_TOPIC_NAME)).allTopicNames().get().get(Topic.SHARE_GROUP_STATE_TOPIC_NAME)
.partitions().stream()
.filter(info -> info.partition() == shareGroupStateTp)
.map(info -> info.leader().id())
.toList();
assertEquals(1, newShareCoordNodeId.size());
assertNotEquals(curShareCoordNodeId.get(0), newShareCoordNodeId.get(0));
}
}, 5L, TimeUnit.SECONDS
);
assertEquals(1, newShareCoordNodeId.size());
assertNotEquals(curShareCoordNodeId.get(0), newShareCoordNodeId.get(0));
// top the producer after some time (but after coordinator shutdown)
service.schedule(() -> {
prodState.done().set(true);
}, 10L, TimeUnit.SECONDS
);
// again produce to same topic partition
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
tpMulti.topic(),
tpMulti.partition(),
null,
"key".getBytes(),
"value".getBytes()
);
IntStream.range(0, 10).forEach(__ -> producer.send(record));
producer.flush();
}
// wait for both producer and consumer to finish
TestUtils.waitForCondition(
() -> prodState.done().get() && consState.done().get(),
45_000L,
500L,
() -> "prod/cons not done yet"
);
// consume messages should only be possible if partition and share coord has moved
// from shutdown broker since we are only producing to partition 0 of topic.
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(groupId)) {
shareConsumer.subscribe(List.of(topicName));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
assertEquals(20, records.count());
}
// Make sure we consumed all records. Consumed records could be higher
// due to re-delivery but that is expected since we are only guaranteeing
// at least once semantics.
assertTrue(prodState.count().get() <= consState.count().get());
Set<String> consumedKeys = consumed.keySet();
assertTrue(produced.containsAll(consumedKeys) && consumedKeys.containsAll(produced));
verifyShareGroupStateTopicRecordsProduced();
}
shutdownExecutorService(service);
verifyShareGroupStateTopicRecordsProduced();
}
@ClusterTest(
@ -1929,6 +1988,8 @@ public class ShareConsumerTest {
@ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")
}
)
@Timeout(150)
@Flaky("KAFKA-18665")
public void testComplexShareConsumer() throws Exception {
setup();
String topicName = "multipart";
@ -1936,19 +1997,18 @@ public class ShareConsumerTest {
createTopic(topicName, 3, 3);
TopicPartition multiTp = new TopicPartition(topicName, 0);
ExecutorService executer = Executors.newCachedThreadPool();
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
AtomicBoolean prodDone = new AtomicBoolean(false);
AtomicInteger sentCount = new AtomicInteger(0);
ClientState prodState = new ClientState();
// produce messages until we want
executer.execute(() -> {
service.execute(() -> {
try (Producer<byte[], byte[]> producer = createProducer()) {
while (!prodDone.get()) {
while (!prodState.done().get()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(multiTp.topic(), multiTp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();
sentCount.incrementAndGet();
prodState.count().incrementAndGet();
}
}
});
@ -1961,28 +2021,45 @@ public class ShareConsumerTest {
Map.of()
);
executer.execute(complexCons1);
service.schedule(
complexCons1,
100L,
TimeUnit.MILLISECONDS
);
// let the complex consumer read the messages
executer.execute(() -> {
try {
TimeUnit.SECONDS.sleep(10L);
prodDone.set(true);
} catch (InterruptedException e) {
// ignore
}
});
service.schedule(() -> {
prodState.done().set(true);
}, 10L, TimeUnit.SECONDS
);
// all messages which can be read are read, some would be redelivered
TestUtils.waitForCondition(complexCons1::isDone, 30_000L, () -> "did not close!");
assertTrue(sentCount.get() < complexCons1.recordsRead());
TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!");
executer.shutdown();
executer.shutdownNow();
assertTrue(prodState.count().get() < complexCons1.recordsRead());
shutdownExecutorService(service);
verifyShareGroupStateTopicRecordsProduced();
}
/**
* Util class to encapsulate state for a consumer/producer
* being executed by an {@link ExecutorService}.
*/
private static class ClientState {
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicInteger count = new AtomicInteger(0);
AtomicBoolean done() {
return done;
}
AtomicInteger count() {
return count;
}
}
private int produceMessages(int messageCount) {
try (Producer<byte[], byte[]> producer = createProducer()) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
@ -2217,9 +2294,7 @@ public class ShareConsumerTest {
private final String topicName;
private final Map<String, Object> configs = new HashMap<>();
private final AtomicBoolean isDone = new AtomicBoolean(false);
private final AtomicBoolean shouldLoop = new AtomicBoolean(true);
private final AtomicInteger readCount = new AtomicInteger(0);
private final ClientState state = new ClientState();
private final Predicate<ConsumerRecords<K, V>> exitCriteria;
private final BiConsumer<ShareConsumer<K, V>, ConsumerRecord<K, V>> processFunc;
@ -2267,31 +2342,42 @@ public class ShareConsumerTest {
}
void stop() {
shouldLoop.set(false);
state.done().set(true);
}
@Override
public void run() {
try (ShareConsumer<K, V> consumer = new KafkaShareConsumer<>(configs)) {
consumer.subscribe(Set.of(this.topicName));
while (shouldLoop.get()) {
while (!state.done().get()) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
readCount.addAndGet(records.count());
state.count().addAndGet(records.count());
if (exitCriteria.test(records)) {
break;
state.done().set(true);
}
records.forEach(record -> processFunc.accept(consumer, record));
}
}
isDone.set(true);
}
boolean isDone() {
return isDone.get();
return state.done().get();
}
int recordsRead() {
return readCount.get();
return state.count().get();
}
}
private void shutdownExecutorService(ExecutorService service) {
service.shutdown();
try {
if (!service.awaitTermination(5L, TimeUnit.SECONDS)) {
service.shutdownNow();
}
} catch (Exception e) {
service.shutdownNow();
Thread.currentThread().interrupt();
}
}
}