KAFKA-18827: Initialize share group state group coordinator impl. [3/N] (#19026)

* This PR adds impl for the initialize share groups call from the Group
Coordinator perspective.
* The initialize call on persister instance will be invoked by the
`GroupCoordinatorService`, based on the response of the
`GroupCoordinatorShard.shareGroupHeartbeat`. If there is new topic
subscription or member assignment change (topic paritions incremented),
the delta share partitions corresponding to the share group in question
are returned as an optional initialize request.
* The request is then sent to the share coordinator as an encapsulated
timer task because we want the heartbeat response to go asynchronously.
* Tests have been added for `GroupCoordinatorService` and
`GroupMetadataManager`. Existing tests have also been updated.
* A new formatter `ShareGroupStatePartitionMetadataFormatter` has been
added for debugging.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Sushant Mahajan 2025-03-27 01:10:23 +05:30 committed by GitHub
parent 56d1dc1b6e
commit eb88e78373
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1641 additions and 192 deletions

View File

@ -23,8 +23,10 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.ShareMemberDescription;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -94,6 +96,7 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -104,6 +107,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings("ClassFanOutComplexity")
@Timeout(1200)
@Tag("integration")
@ClusterTestDefaults(
@ -249,7 +253,7 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
@ -265,7 +269,7 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
producer.send(record);
records = shareConsumer.poll(Duration.ofMillis(5000));
@ -289,7 +293,7 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
shareConsumer.subscribe(List.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
@ -375,7 +379,7 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
// Now in the second poll, we implicitly acknowledge the record received in the first poll.
@ -406,7 +410,7 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
// Waiting until the acquisition lock expires.
@ -563,7 +567,7 @@ public class ShareConsumerTest {
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 4);
assertEquals(4, records.count());
assertEquals(transactional1.offset(), records.records(tp).get(0).offset());
assertEquals(nonTransactional1.offset(), records.records(tp).get(1).offset());
@ -591,7 +595,7 @@ public class ShareConsumerTest {
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
records.forEach(shareConsumer::acknowledge);
producer.send(record);
@ -612,7 +616,7 @@ public class ShareConsumerTest {
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
records.forEach(shareConsumer::acknowledge);
producer.send(record);
@ -646,7 +650,7 @@ public class ShareConsumerTest {
Map<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<>();
shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));
ConsumerRecords<byte[], byte[]> records = shareConsumer1.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer1, 2500L, 3);
assertEquals(3, records.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
@ -737,7 +741,7 @@ public class ShareConsumerTest {
Map<TopicPartition, Exception> partitionExceptionMap = new HashMap<>();
shareConsumer1.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
ConsumerRecords<byte[], byte[]> records = shareConsumer1.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer1, 2500L, 3);
assertEquals(3, records.count());
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
@ -791,7 +795,7 @@ public class ShareConsumerTest {
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
records = shareConsumer.poll(Duration.ofMillis(5000));
@ -813,7 +817,7 @@ public class ShareConsumerTest {
producer.send(record);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
@ -834,7 +838,7 @@ public class ShareConsumerTest {
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
verifyShareGroupStateTopicRecordsProduced();
@ -852,7 +856,7 @@ public class ShareConsumerTest {
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
shareConsumer.acknowledge(consumedRecord);
@ -874,7 +878,7 @@ public class ShareConsumerTest {
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
ConsumerRecord<byte[], byte[]> consumedRecord = records.records(tp).get(0);
records = shareConsumer.poll(Duration.ofMillis(500));
@ -885,7 +889,7 @@ public class ShareConsumerTest {
}
@ClusterTest
public void testImplicitAcknowledgeCommitSync() {
public void testImplicitAcknowledgeCommitSync() throws InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1")) {
@ -895,7 +899,17 @@ public class ShareConsumerTest {
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
AtomicReference<ConsumerRecords<byte[], byte[]>> recordsAtomic = new AtomicReference<>();
waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> recs = shareConsumer.poll(Duration.ofMillis(2500L));
recordsAtomic.set(recs);
return recs.count() == 1;
},
DEFAULT_MAX_WAIT_MS,
500L,
() -> "records not found"
);
ConsumerRecords<byte[], byte[]> records = recordsAtomic.get();
assertEquals(1, records.count());
Map<TopicIdPartition, Optional<KafkaException>> result = shareConsumer.commitSync();
assertEquals(1, result.size());
@ -928,7 +942,7 @@ public class ShareConsumerTest {
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 3);
assertEquals(3, records.count());
// Implicitly acknowledging all the records received.
@ -950,16 +964,16 @@ public class ShareConsumerTest {
public void testConfiguredExplicitAcknowledgeCommitSuccess() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) {
ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
"group1",
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
producer.send(record);
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
records.forEach(shareConsumer::acknowledge);
producer.send(record);
@ -984,7 +998,7 @@ public class ShareConsumerTest {
producer.flush();
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(records.iterator().next()));
}
@ -1008,7 +1022,7 @@ public class ShareConsumerTest {
shareConsumer.subscribe(Set.of(tp.topic()));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 2);
assertEquals(2, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
@ -1102,7 +1116,7 @@ public class ShareConsumerTest {
@ClusterTest
public void testMultipleConsumersInGroupConcurrentConsumption()
throws InterruptedException, ExecutionException, TimeoutException {
throws InterruptedException, ExecutionException, TimeoutException {
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
int consumerCount = 4;
@ -1122,9 +1136,9 @@ public class ShareConsumerTest {
for (int i = 0; i < consumerCount; i++) {
final int consumerNumber = i + 1;
consumerFutures.add(CompletableFuture.supplyAsync(() ->
consumeMessages(totalMessagesConsumed,
producerCount * messagesPerProducer, groupId, consumerNumber,
30, true, maxBytes)));
consumeMessages(totalMessagesConsumed,
producerCount * messagesPerProducer, groupId, consumerNumber,
30, true, maxBytes)));
}
CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new)).get(60, TimeUnit.SECONDS);
@ -1136,7 +1150,7 @@ public class ShareConsumerTest {
@ClusterTest
public void testMultipleConsumersInMultipleGroupsConcurrentConsumption()
throws ExecutionException, InterruptedException, TimeoutException {
throws ExecutionException, InterruptedException, TimeoutException {
AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0);
@ -1159,7 +1173,7 @@ public class ShareConsumerTest {
}
// Wait for the producers to run
assertDoesNotThrow(() -> CompletableFuture.allOf(producerFutures.toArray(CompletableFuture[]::new))
.get(15, TimeUnit.SECONDS), "Exception awaiting produceMessages");
.get(15, TimeUnit.SECONDS), "Exception awaiting produceMessages");
int actualMessageSent = producerFutures.stream().mapToInt(CompletableFuture::join).sum();
List<CompletableFuture<Integer>> consumeMessagesFutures1 = new ArrayList<>();
@ -1170,21 +1184,21 @@ public class ShareConsumerTest {
for (int i = 0; i < 2; i++) {
final int consumerNumber = i + 1;
consumeMessagesFutures1.add(CompletableFuture.supplyAsync(() ->
consumeMessages(totalMessagesConsumedGroup1, totalMessagesSent,
"group1", consumerNumber, 100, true, maxBytes)));
consumeMessages(totalMessagesConsumedGroup1, totalMessagesSent,
"group1", consumerNumber, 100, true, maxBytes)));
consumeMessagesFutures2.add(CompletableFuture.supplyAsync(() ->
consumeMessages(totalMessagesConsumedGroup2, totalMessagesSent,
"group2", consumerNumber, 100, true, maxBytes)));
consumeMessages(totalMessagesConsumedGroup2, totalMessagesSent,
"group2", consumerNumber, 100, true, maxBytes)));
consumeMessagesFutures3.add(CompletableFuture.supplyAsync(() ->
consumeMessages(totalMessagesConsumedGroup3, totalMessagesSent,
"group3", consumerNumber, 100, true, maxBytes)));
consumeMessages(totalMessagesConsumedGroup3, totalMessagesSent,
"group3", consumerNumber, 100, true, maxBytes)));
}
CompletableFuture.allOf(Stream.of(consumeMessagesFutures1.stream(), consumeMessagesFutures2.stream(),
consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new))
.get(120, TimeUnit.SECONDS);
consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new))
.get(120, TimeUnit.SECONDS);
int totalResult1 = consumeMessagesFutures1.stream().mapToInt(CompletableFuture::join).sum();
int totalResult2 = consumeMessagesFutures2.stream().mapToInt(CompletableFuture::join).sum();
@ -1246,7 +1260,7 @@ public class ShareConsumerTest {
@ClusterTest
public void testMultipleConsumersInGroupFailureConcurrentConsumption()
throws InterruptedException, ExecutionException, TimeoutException {
throws InterruptedException, ExecutionException, TimeoutException {
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
int consumerCount = 4;
@ -1266,19 +1280,19 @@ public class ShareConsumerTest {
// The "failing" consumer polls but immediately closes, which releases the records for the other consumers
CompletableFuture<Integer> failedMessagesConsumedFuture = CompletableFuture.supplyAsync(
() -> consumeMessages(new AtomicInteger(0), producerCount * messagesPerProducer, groupId,
0, 1, false));
() -> consumeMessages(new AtomicInteger(0), producerCount * messagesPerProducer, groupId,
0, 1, false));
// Wait for the failed consumer to run
assertDoesNotThrow(() -> failedMessagesConsumedFuture.get(15, TimeUnit.SECONDS),
"Exception awaiting consumeMessages");
"Exception awaiting consumeMessages");
List<CompletableFuture<Integer>> consumeMessagesFutures = new ArrayList<>();
for (int i = 0; i < consumerCount; i++) {
final int consumerNumber = i + 1;
consumeMessagesFutures.add(CompletableFuture.supplyAsync(
() -> consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer,
groupId, consumerNumber, 40, true, maxBytes)));
() -> consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer,
groupId, consumerNumber, 40, true, maxBytes)));
}
CompletableFuture.allOf(produceMessageFutures.toArray(CompletableFuture[]::new)).get(60, TimeUnit.SECONDS);
@ -1308,7 +1322,7 @@ public class ShareConsumerTest {
// Poll twice to receive records. The first poll fetches the record and starts the acquisition lock timer.
// Since, we are only sending one record and the acquisition lock hasn't timed out, the second poll only
// acknowledges the record from the first poll and does not fetch any more records.
ConsumerRecords<byte[], byte[]> consumerRecords = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> consumerRecords = waitedPoll(shareConsumer, 2500L, 1);
ConsumerRecord<byte[], byte[]> consumerRecord = consumerRecords.records(tp).get(0);
assertEquals("key_1", new String(consumerRecord.key()));
assertEquals("value_1", new String(consumerRecord.value()));
@ -1544,7 +1558,7 @@ public class ShareConsumerTest {
shareConsumer.wakeup();
assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ZERO));
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
assertEquals(1, records.count());
verifyShareGroupStateTopicRecordsProduced();
}
@ -1676,7 +1690,7 @@ public class ShareConsumerTest {
// Producing a record.
producer.send(record);
producer.flush();
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 0, true, "group1", List.of(tp));
// No records should be consumed because share.auto.offset.reset has a default of "latest". Since the record
// was produced before share partition was initialized (which happens after the first share fetch request
// in the poll method), the start offset would be the latest offset, i.e. 1 (the next offset after the already
@ -1703,7 +1717,7 @@ public class ShareConsumerTest {
// Producing a record.
producer.send(record);
producer.flush();
ConsumerRecords<byte[], byte[]> records = shareConsumer.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
// Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume
// all messages present on the partition
assertEquals(1, records.count());
@ -1758,12 +1772,12 @@ public class ShareConsumerTest {
// Producing a record.
producer.send(record);
producer.flush();
ConsumerRecords<byte[], byte[]> records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records1 = waitedPoll(shareConsumerEarliest, 2500L, 1);
// Since the value for share.auto.offset.reset has been altered to "earliest", the consumer should consume
// all messages present on the partition
assertEquals(1, records1.count());
ConsumerRecords<byte[], byte[]> records2 = shareConsumerLatest.poll(Duration.ofMillis(5000));
ConsumerRecords<byte[], byte[]> records2 = waitedPoll(shareConsumerLatest, 2500L, 0, true, "group2", List.of(tp));
// Since the value for share.auto.offset.reset has been altered to "latest", the consumer should not consume
// any message
assertEquals(0, records2.count());
@ -2114,14 +2128,14 @@ public class ShareConsumerTest {
}
private int consumeMessages(AtomicInteger totalMessagesConsumed,
int totalMessages,
String groupId,
int consumerNumber,
int maxPolls,
boolean commit) {
int totalMessages,
String groupId,
int consumerNumber,
int maxPolls,
boolean commit) {
return assertDoesNotThrow(() -> {
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
groupId)) {
groupId)) {
shareConsumer.subscribe(Set.of(tp.topic()));
return consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit);
}
@ -2129,16 +2143,16 @@ public class ShareConsumerTest {
}
private int consumeMessages(AtomicInteger totalMessagesConsumed,
int totalMessages,
String groupId,
int consumerNumber,
int maxPolls,
boolean commit,
int maxFetchBytes) {
int totalMessages,
String groupId,
int consumerNumber,
int maxPolls,
boolean commit,
int maxFetchBytes) {
return assertDoesNotThrow(() -> {
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
groupId,
Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes))) {
groupId,
Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes))) {
shareConsumer.subscribe(Set.of(tp.topic()));
return consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit);
}
@ -2146,11 +2160,11 @@ public class ShareConsumerTest {
}
private int consumeMessages(ShareConsumer<byte[], byte[]> consumer,
AtomicInteger totalMessagesConsumed,
int totalMessages,
int consumerNumber,
int maxPolls,
boolean commit) {
AtomicInteger totalMessagesConsumed,
int totalMessages,
int consumerNumber,
int maxPolls,
boolean commit) {
return assertDoesNotThrow(() -> {
int messagesConsumed = 0;
int retries = 0;
@ -2318,6 +2332,7 @@ public class ShareConsumerTest {
* <p></p>
* This can be used to create different consume patterns on the broker and study
* the status of broker side share group abstractions.
*
* @param <K> - key type of the records consumed
* @param <V> - value type of the records consumed
*/
@ -2414,6 +2429,69 @@ public class ShareConsumerTest {
}
}
private ConsumerRecords<byte[], byte[]> waitedPoll(
ShareConsumer<byte[], byte[]> shareConsumer,
long pollMs,
int recordCount
) {
return waitedPoll(shareConsumer, pollMs, recordCount, false, "", List.of());
}
private ConsumerRecords<byte[], byte[]> waitedPoll(
ShareConsumer<byte[], byte[]> shareConsumer,
long pollMs,
int recordCount,
boolean checkAssignment,
String groupId,
List<TopicPartition> tps
) {
AtomicReference<ConsumerRecords<byte[], byte[]>> recordsAtomic = new AtomicReference<>();
try {
waitForCondition(() -> {
ConsumerRecords<byte[], byte[]> recs = shareConsumer.poll(Duration.ofMillis(pollMs));
recordsAtomic.set(recs);
if (checkAssignment) {
waitForAssignment(groupId, tps);
}
return recs.count() == recordCount;
},
DEFAULT_MAX_WAIT_MS,
500L,
() -> "failed to get records"
);
return recordsAtomic.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void waitForAssignment(String groupId, List<TopicPartition> tps) {
try {
waitForCondition(() -> {
try (Admin admin = createAdminClient()) {
Collection<ShareMemberDescription> members = admin.describeShareGroups(List.of(groupId),
new DescribeShareGroupsOptions().includeAuthorizedOperations(true)
).describedGroups().get(groupId).get().members();
Set<TopicPartition> assigned = new HashSet<>();
members.forEach(desc -> {
if (desc.assignment() != null) {
assigned.addAll(desc.assignment().topicPartitions());
}
});
return assigned.containsAll(tps);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
DEFAULT_MAX_WAIT_MS,
1000L,
() -> "tps not assigned to members"
);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static class SerializerImpl implements Serializer<byte[]> {
@Override

View File

@ -2726,8 +2726,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
client.listGroups(options).all.get.stream().filter(_.groupId == testGroupId).count() == 0
}, s"Expected to find zero groups")
val describeWithFakeGroupResult = client.describeShareGroups(util.Arrays.asList(testGroupId, fakeGroupId),
new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
var describeWithFakeGroupResult: DescribeShareGroupsResult = null
TestUtils.waitUntilTrue(() => {
describeWithFakeGroupResult = client.describeShareGroups(util.Arrays.asList(testGroupId, fakeGroupId),
new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
val members = describeWithFakeGroupResult.describedGroups().get(testGroupId).get().members()
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic()).nonEmpty
}, s"Could not get partitions assigned. Last response $describeWithFakeGroupResult.")
assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
// Test that we can get information about the test share group.

View File

@ -125,7 +125,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment)
// Leave the group.
@ -232,14 +232,26 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Heartbeats until the partitions are assigned for member 1.
shareGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data.assignment != null
if (shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && shareGroupHeartbeatResponse.data().assignment() != null) {
true
} else {
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId1)
.setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch()),
true
).build()
false
}
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
val topicPartitionsAssignedToMember1 = shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response.
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
// Prepare the next heartbeat for member 2.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
@ -259,7 +271,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
val topicPartitionsAssignedToMember2 = shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response.
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]()
topicPartitionsAssignedToMember1.forEach(topicPartition => {
@ -290,7 +302,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
// Verify the response.
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
} finally {
admin.close()
}
@ -369,7 +381,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
// Member leaves the group.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
@ -402,7 +414,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
// Verify the response for member 1.
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(memberId, shareGroupHeartbeatResponse.data.memberId)
// Partition assignment remains intact on rejoining.
assertEquals(expectedAssignment, shareGroupHeartbeatResponse.data.assignment)
@ -491,7 +503,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not get partitions for topic foo and bar assigned. Last response $shareGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
// Create the topic baz.
val bazTopicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
@ -515,7 +527,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
.setMemberEpoch(2),
.setMemberEpoch(3),
true
).build()
@ -527,7 +539,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not get partitions for topic baz assigned. Last response $shareGroupHeartbeatResponse.")
// Verify the response.
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
// Increasing the partitions of topic bar which is already being consumed in the share group.
increasePartitions(admin, "bar", 6, Seq.empty)
@ -547,7 +559,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
.setMemberEpoch(3),
.setMemberEpoch(5),
true
).build()
@ -559,7 +571,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not update partitions assignment for topic bar. Last response $shareGroupHeartbeatResponse.")
// Verify the response.
assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(7, shareGroupHeartbeatResponse.data.memberEpoch)
// Delete the topic foo.
TestUtils.deleteTopicWithAdmin(
admin = admin,
@ -581,7 +593,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
.setMemberEpoch(4),
.setMemberEpoch(7),
true
).build()
@ -593,7 +605,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not update partitions assignment for topic foo. Last response $shareGroupHeartbeatResponse.")
// Verify the response.
assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch)
} finally {
admin.close()
}
@ -704,12 +716,24 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
.setTopicId(barId)
.setPartitions(List[Integer](0).asJava)).asJava)
shareGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse = connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
shareGroupHeartbeatResponse.data.assignment != null &&
if (shareGroupHeartbeatResponse.data.assignment != null &&
expectedAssignment.topicPartitions.containsAll(shareGroupHeartbeatResponse.data.assignment.topicPartitions) &&
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)) {
true
} else {
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
.setMemberEpoch(shareGroupHeartbeatResponse.data.memberEpoch),
true
).build()
false
}
}, msg = s"Could not get bar partitions assigned. Last response $shareGroupHeartbeatResponse.")
// Verify the response, the epoch should have been bumped.
@ -840,7 +864,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response $shareGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
// Restart the only running broker.
val broker = cluster.brokers().values().iterator().next()
@ -864,7 +888,7 @@ class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Verify the response. Epoch should not have changed and null assignments determines that no
// change in old assignment.
assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
assertNull(shareGroupHeartbeatResponse.data.assignment)
} finally {
admin.close()

View File

@ -30,7 +30,7 @@ public interface SubscribedTopicDescriber {
*
* @param topicId Uuid corresponding to the topic.
* @return The number of partitions corresponding to the given topic Id,
* or -1 if the topic Id does not exist.
* or -1 if the topic id does not exist.
*/
int numPartitions(Uuid topicId);
@ -43,4 +43,14 @@ public interface SubscribedTopicDescriber {
* If the topic Id does not exist, an empty set is returned.
*/
Set<String> racksForPartition(Uuid topicId, int partition);
/**
* Returns a set of partitions corresponding to a topic id which
* are allowlisted based on some criteria. For example, for share groups
* only partitions which have been initialized are returned.
*
* @param topicId The uuid of the topic
* @return The set of integers representing assignable partitions. Could be empty, or contain all partitions.
*/
Set<Integer> assignablePartitions(Uuid topicId);
}

View File

@ -47,6 +47,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
@ -812,6 +814,44 @@ public class GroupCoordinatorRecordHelpers {
);
}
/**
* Creates a ShareGroupStatePartitionMetadata record.
*
* @param groupId The share group id.
* @param initializedTopics Topics which have been initialized.
* @param deletingTopics Topics which are being deleted.
* @return The record.
*/
public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord(
String groupId,
Map<Uuid, Map.Entry<String, Set<Integer>>> initializedTopics,
Map<Uuid, String> deletingTopics
) {
List<ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo> initializedTopicPartitionInfo = initializedTopics.entrySet().stream()
.map(entry -> new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(entry.getKey())
.setTopicName(entry.getValue().getKey())
.setPartitions(entry.getValue().getValue().stream().toList()))
.toList();
List<ShareGroupStatePartitionMetadataValue.TopicInfo> deletingTopicsInfo = deletingTopics.entrySet().stream()
.map(entry -> new ShareGroupStatePartitionMetadataValue.TopicInfo()
.setTopicId(entry.getKey())
.setTopicName(entry.getValue()))
.toList();
return CoordinatorRecord.record(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(initializedTopicPartitionInfo)
.setDeletingTopics(deletingTopicsInfo),
(short) 0
)
);
}
private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions
) {

View File

@ -88,13 +88,18 @@ import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateResult;
import org.apache.kafka.server.share.persister.PartitionErrorData;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionStateData;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.slf4j.Logger;
@ -115,6 +120,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
@ -242,7 +248,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
runtime,
groupCoordinatorMetrics,
groupConfigManager,
persister
persister,
timer
);
}
}
@ -294,6 +301,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
*/
private MetadataImage metadataImage = null;
private Timer timer;
/**
*
* @param logContext The log context.
@ -309,7 +318,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
GroupCoordinatorMetrics groupCoordinatorMetrics,
GroupConfigManager groupConfigManager,
Persister persister
Persister persister,
Timer timer
) {
this.log = logContext.logger(GroupCoordinatorService.class);
this.config = config;
@ -317,6 +327,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
this.groupCoordinatorMetrics = groupCoordinatorMetrics;
this.groupConfigManager = groupConfigManager;
this.persister = persister;
this.timer = timer;
}
/**
@ -435,7 +446,19 @@ public class GroupCoordinatorService implements GroupCoordinator {
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.shareGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
).thenCompose(result -> {
// This ensures that the previous group write has completed successfully
// before we start the persister initialize phase.
if (result.getValue().isPresent()) {
timer.add(new TimerTask(0L) {
@Override
public void run() {
persisterInitialize(result.getValue().get(), result.getKey());
}
});
}
return CompletableFuture.completedFuture(result.getKey());
}).exceptionally(exception -> handleOperationException(
"share-group-heartbeat",
request,
exception,
@ -446,6 +469,81 @@ public class GroupCoordinatorService implements GroupCoordinator {
));
}
// Visibility for testing
CompletableFuture<ShareGroupHeartbeatResponseData> persisterInitialize(
InitializeShareGroupStateParameters request,
ShareGroupHeartbeatResponseData defaultResponse
) {
return persister.initializeState(request)
.thenCompose(
response -> handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), response, defaultResponse)
).exceptionally(exception -> {
GroupTopicPartitionData<PartitionStateData> gtp = request.groupTopicPartitionData();
log.error("Unable to initialize share group state {}, {}", gtp.groupId(), gtp.topicsData(), exception);
Errors error = Errors.forException(exception);
return new ShareGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setErrorMessage(error.message());
});
}
private CompletableFuture<ShareGroupHeartbeatResponseData> handlePersisterInitializeResponse(
String groupId,
InitializeShareGroupStateResult persisterInitializeResult,
ShareGroupHeartbeatResponseData defaultResponse
) {
Errors persisterError = Errors.NONE;
for (TopicData<PartitionErrorData> topicData : persisterInitializeResult.topicsData()) {
Optional<PartitionErrorData> errData = topicData.partitions().stream().filter(partition -> partition.errorCode() != Errors.NONE.code()).findAny();
if (errData.isPresent()) {
persisterError = Errors.forCode(errData.get().errorCode());
break;
}
}
if (persisterError.code() == Errors.NONE.code()) {
Map<Uuid, Set<Integer>> topicPartitionMap = new HashMap<>();
for (TopicData<PartitionErrorData> topicData : persisterInitializeResult.topicsData()) {
topicPartitionMap.put(
topicData.topicId(),
topicData.partitions().stream().map(PartitionErrorData::partition).collect(Collectors.toSet())
);
}
if (topicPartitionMap.isEmpty()) {
return CompletableFuture.completedFuture(defaultResponse);
}
return performShareGroupStateMetadataInitialize(groupId, topicPartitionMap, defaultResponse);
} else {
log.error("Received error while calling initialize state for {} on persister {}.", groupId, persisterError.code());
return CompletableFuture.completedFuture(
new ShareGroupHeartbeatResponseData()
.setErrorCode(persisterError.code())
.setErrorMessage(persisterError.message())
);
}
}
private CompletableFuture<ShareGroupHeartbeatResponseData> performShareGroupStateMetadataInitialize(
String groupId,
Map<Uuid, Set<Integer>> topicPartitionMap,
ShareGroupHeartbeatResponseData defaultResponse
) {
return runtime.scheduleWriteOperation(
"initialize-share-group-state",
topicPartitionFor(groupId),
Duration.ofMillis(config.offsetCommitTimeoutMs()),
coordinator -> coordinator.initializeShareGroupState(groupId, topicPartitionMap)
).thenApply(
__ -> defaultResponse
).exceptionally(exception -> {
log.error("Unable to initialize share group state partition metadata for {}.", groupId, exception);
Errors error = Errors.forException(exception);
return new ShareGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setErrorMessage(error.message());
});
}
/**
* See {@link GroupCoordinator#joinGroup(RequestContext, JoinGroupRequestData, BufferSupplier)}.
*/

View File

@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
@ -91,6 +92,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
@ -118,6 +121,7 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
@ -412,16 +416,32 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
* @param context The request context.
* @param request The actual ShareGroupHeartbeat request.
*
* @return A Result containing the ShareGroupHeartbeat response and
* a list of records to update the state machine.
* @return A Result containing a pair of ShareGroupHeartbeat response maybe InitializeShareGroupStateParameters
* and a list of records to update the state machine.
*/
public CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(
public CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
) {
return groupMetadataManager.shareGroupHeartbeat(context, request);
}
/**
* Handles record creation, if needed, related to ShareGroupStatePartitionMetadata
* corresponding to a share group heartbeat request.
*
* @param groupId The group id corresponding to the share group whose share partitions have been initialized.
* @param topicPartitionMap Map representing topic partition data to be added to the share state partition metadata.
*
* @return A Result containing coordinator records and Void response.
*/
public CoordinatorResult<Void, CoordinatorRecord> initializeShareGroupState(
String groupId,
Map<Uuid, Set<Integer>> topicPartitionMap
) {
return groupMetadataManager.initializeShareGroupState(groupId, topicPartitionMap);
}
/**
* Handles a JoinGroup request.
*
@ -1013,6 +1033,13 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord
);
break;
case SHARE_GROUP_STATE_PARTITION_METADATA:
groupMetadataManager.replay(
(ShareGroupStatePartitionMetadataKey) key,
(ShareGroupStatePartitionMetadataValue) Utils.messageOrNull(value)
);
break;
case CONSUMER_GROUP_REGULAR_EXPRESSION:
groupMetadataManager.replay(
(ConsumerGroupRegularExpressionKey) key,

View File

@ -103,6 +103,8 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
@ -150,6 +152,7 @@ import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData;
import org.apache.kafka.server.share.persister.TopicData;
@ -209,6 +212,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
@ -428,6 +432,11 @@ public class GroupMetadataManager {
*/
private final TimelineHashMap<String, TimelineHashSet<String>> groupsByTopics;
/**
* The share group partition metadata info keyed by group id.
*/
private final TimelineHashMap<String, ShareGroupStatePartitionMetadataInfo> shareGroupPartitionMetadata;
/**
* The group manager.
*/
@ -463,6 +472,19 @@ public class GroupMetadataManager {
*/
private final ShareGroupPartitionAssignor shareGroupAssignor;
/**
* A record class to hold the value representing ShareGroupStatePartitionMetadata for the TimelineHashmap
* keyed on share group id.
*
* @param initializedTopics Map of set of partition ids keyed on the topic id.
* @param deletingTopics Set of topic ids.
*/
private record ShareGroupStatePartitionMetadataInfo(
Map<Uuid, Set<Integer>> initializedTopics,
Set<Uuid> deletingTopics
) {
}
/**
* The authorizer to validate the regex subscription topics.
*/
@ -497,6 +519,7 @@ public class GroupMetadataManager {
this.defaultConsumerGroupAssignor = config.consumerGroupAssignors().get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.shareGroupPartitionMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupConfigManager = groupConfigManager;
this.shareGroupAssignor = shareGroupAssignor;
this.streamsGroupSessionTimeoutMs = 45000;
@ -2190,10 +2213,10 @@ public class GroupMetadataManager {
* @param clientHost The client host.
* @param subscribedTopicNames The list of subscribed topic names from the request or null.
*
* @return A Result containing the ShareGroupHeartbeat response and
* a list of records to update the state machine.
* @return A Result containing a pair of ShareGroupHeartbeat response and maybe InitializeShareGroupStateParameters
* and a list of records to update the state machine.
*/
private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(
private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat(
String groupId,
String memberId,
int memberEpoch,
@ -2235,7 +2258,7 @@ public class GroupMetadataManager {
member,
updatedMember,
records
);
) || initializedAssignmentPending(group);
int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
@ -2323,10 +2346,112 @@ public class GroupMetadataManager {
// 2. The member's assignment has been updated.
boolean isFullRequest = subscribedTopicNames != null;
if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(createShareGroupResponseAssignment(updatedMember));
ShareGroupHeartbeatResponseData.Assignment assignment = createShareGroupResponseAssignment(updatedMember);
response.setAssignment(assignment);
}
return new CoordinatorResult<>(records, response);
return new CoordinatorResult<>(
records,
Map.entry(
response,
maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscriptionMetadata)
)
);
}
private boolean initializedAssignmentPending(ShareGroup group) {
if (!shareGroupPartitionMetadata.containsKey(group.groupId())) {
// No initialized share partitions for the group so nothing can be assigned.
return false;
}
if (group.isEmpty()) {
// No members then no point of computing assignment.
return false;
}
// We need to check if all the group initialized share partitions are part of the group assignment.
Map<Uuid, Set<Integer>> initializedTps = shareGroupPartitionMetadata.get(group.groupId()).initializedTopics();
Map<Uuid, Set<Integer>> currentAssigned = new HashMap<>();
for (Assignment assignment : group.targetAssignment().values()) {
for (Map.Entry<Uuid, Set<Integer>> tps : assignment.partitions().entrySet()) {
currentAssigned.computeIfAbsent(tps.getKey(), k -> new HashSet<>())
.addAll(tps.getValue());
}
}
return !initializedTps.equals(currentAssigned);
}
/**
* Computes the diff between the subscribed metadata and the initialized share topic
* partitions corresponding to a share group.
*
* @param groupId The share group id for which diff is being calculated
* @param subscriptionMetadata The subscription metadata corresponding to the share group.
* @return A map of topic partitions which are subscribed by the share group but not initialized yet.
*/
// Visibility for testing
Map<Uuid, Map.Entry<String, Set<Integer>>> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) {
TopicsImage topicsImage = metadataImage.topics();
if (topicsImage == null || topicsImage.isEmpty() || subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
return Map.of();
}
Map<Uuid, Map.Entry<String, Set<Integer>>> topicPartitionChangeMap = new HashMap<>();
ShareGroupStatePartitionMetadataInfo info = shareGroupPartitionMetadata.get(groupId);
Map<Uuid, Set<Integer>> alreadyInitialized = info == null ? Map.of() : info.initializedTopics();
subscriptionMetadata.forEach((topicName, topicMetadata) -> {
Set<Integer> alreadyInitializedPartSet = alreadyInitialized.getOrDefault(topicMetadata.id(), Set.of());
if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) {
Set<Integer> partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet());
partitionSet.removeAll(alreadyInitializedPartSet);
topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> Map.entry(
topicName,
partitionSet
));
}
});
return topicPartitionChangeMap;
}
/**
* Based on the diff between the subscribed topic partitions and the initialized topic partitions,
* created initialize request for the non-initialized ones.
*
* @param groupId The share group id for which partitions need to be initialized.
* @param groupEpoch The group epoch of the share group.
* @param subscriptionMetadata The subscription metadata for the share group.
* @return An optional representing the persister initialize request.
*/
private Optional<InitializeShareGroupStateParameters> maybeCreateInitializeShareGroupStateRequest(
String groupId,
int groupEpoch,
Map<String, TopicMetadata> subscriptionMetadata
) {
if (subscriptionMetadata == null || subscriptionMetadata.isEmpty() || metadataImage.isEmpty()) {
return Optional.empty();
}
Map<Uuid, Map.Entry<String, Set<Integer>>> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata);
// Nothing to initialize.
if (topicPartitionChangeMap.isEmpty()) {
return Optional.empty();
}
return Optional.of(new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId, topicPartitionChangeMap.entrySet().stream()
.map(entry -> new TopicData<>(
entry.getKey(),
entry.getValue().getValue().stream()
.map(partitionId -> PartitionFactory.newPartitionStateData(partitionId, groupEpoch, -1))
.toList())
).toList()
)).build()
);
}
/**
@ -3104,12 +3229,17 @@ public class GroupMetadataManager {
List<CoordinatorRecord> records
) {
try {
Map<Uuid, Set<Integer>> initializedTopicPartitions = shareGroupPartitionMetadata.containsKey(group.groupId()) ?
shareGroupPartitionMetadata.get(group.groupId()).initializedTopics() :
Map.of();
TargetAssignmentBuilder.ShareTargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(), groupEpoch, shareGroupAssignor)
.withMembers(group.members())
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withTopicAssignablePartitionsMap(initializedTopicPartitions)
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
@ -3954,10 +4084,10 @@ public class GroupMetadataManager {
* @param context The request context.
* @param request The actual ShareGroupHeartbeat request.
*
* @return A Result containing the ShareGroupHeartbeat response and
* a list of records to update the state machine.
* @return A Result containing a pair of ShareGroupHeartbeat response and maybe InitializeShareGroupStateParameters
* and a list of records to update the state machine.
*/
public CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(
public CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
) throws ApiException {
@ -3965,10 +4095,15 @@ public class GroupMetadataManager {
if (request.memberEpoch() == ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.
return shareGroupLeave(
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = shareGroupLeave(
request.groupId(),
request.memberId(),
request.memberEpoch());
request.memberEpoch()
);
return new CoordinatorResult<>(
result.records(),
Map.entry(result.response(), Optional.empty())
);
}
// Otherwise, it is a regular heartbeat.
return shareGroupHeartbeat(
@ -3981,6 +4116,55 @@ public class GroupMetadataManager {
request.subscribedTopicNames());
}
/**
* Handles an initialize share group state request. This is usually part of
* shareGroupHeartbeat code flow.
* @param groupId The group id corresponding to the share group whose share partitions have been initialized.
* @param topicPartitionMap Map representing topic partition data to be added to the share state partition metadata.
*
* @return A Result containing ShareGroupStatePartitionMetadata records and Void response.
*/
public CoordinatorResult<Void, CoordinatorRecord> initializeShareGroupState(
String groupId,
Map<Uuid, Set<Integer>> topicPartitionMap
) {
// Should be present
if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
return new CoordinatorResult<>(List.of(), null);
}
ShareGroup group = (ShareGroup) groups.get(groupId);
// We must combine the existing information in the record with the
// topicPartitionMap argument.
Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
ShareGroupStatePartitionMetadataInfo currentMap = shareGroupPartitionMetadata.get(groupId);
if (currentMap == null) {
topicPartitionMap.forEach((k, v) -> finalMap.put(k, Map.entry(metadataImage.topics().getTopic(k).name(), v)));
return new CoordinatorResult<>(
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, Map.of())),
null
);
}
Set<Uuid> combinedTopicIdSet = new HashSet<>(topicPartitionMap.keySet());
combinedTopicIdSet.addAll(currentMap.initializedTopics.keySet());
for (Uuid topicId : combinedTopicIdSet) {
String topicName = metadataImage.topics().getTopic(topicId).name();
Set<Integer> partitions = new HashSet<>(currentMap.initializedTopics.getOrDefault(topicId, new HashSet<>()));
if (topicPartitionMap.containsKey(topicId)) {
partitions.addAll(topicPartitionMap.get(topicId));
}
finalMap.computeIfAbsent(topicId, k -> Map.entry(topicName, partitions));
}
return new CoordinatorResult<>(
List.of(newShareGroupStatePartitionMetadataRecord(group.groupId(), finalMap, Map.of())),
null
);
}
/**
* Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of
* the consumer group. It updates the subscription part of the member or
@ -4747,6 +4931,41 @@ public class GroupMetadataManager {
}
}
/**
* Replays ShareGroupStatePartitionMetadataKey/Value to update the hard state of
* the share group.
*
* @param key A ShareGroupStatePartitionMetadataKey key.
* @param value A ShareGroupStatePartitionMetadataValue record.
*/
public void replay(
ShareGroupStatePartitionMetadataKey key,
ShareGroupStatePartitionMetadataValue value
) {
String groupId = key.groupId();
getOrMaybeCreatePersistedShareGroup(groupId, false);
// Update timeline structures with info about initialized/deleted topics.
if (value == null) {
// Tombstone!
shareGroupPartitionMetadata.remove(groupId);
} else {
ShareGroupStatePartitionMetadataInfo info = new ShareGroupStatePartitionMetadataInfo(
value.initializedTopics().stream()
.map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new HashSet<>(topicPartitionInfo.partitions())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
value.deletingTopics().stream()
.map(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId)
.collect(Collectors.toSet())
);
// Init java record.
shareGroupPartitionMetadata.put(groupId, info);
}
}
/**
* A new metadata image is available.
*

View File

@ -94,7 +94,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
// The current assignment from topic partition to members.
Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment);
return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment, subscribedTopicDescriber);
}
private GroupAssignment assignHeterogeneous(
@ -115,7 +115,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
// The current assignment from topic partition to members.
Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment);
return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment, subscribedTopicDescriber);
}
/**
@ -146,7 +146,8 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
GroupSpec groupSpec,
Set<Uuid> subscribedTopicIds,
List<TopicIdPartition> targetPartitions,
Map<TopicIdPartition, List<String>> currentAssignment
Map<TopicIdPartition, List<String>> currentAssignment,
SubscribedTopicDescriber subscribedTopicDescriber
) {
// For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
// That can be expressed as Math.ceil(numTargetPartitions / (double) numGroupMembers)
@ -195,7 +196,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
.filter(targetPartition -> !finalAssignmentByPartition.containsKey(targetPartition))
.toList();
roundRobinAssignmentWithCount(groupSpec.memberIds(), unassignedPartitions, finalAssignment, desiredAssignmentCount);
roundRobinAssignmentWithCount(groupSpec.memberIds(), unassignedPartitions, finalAssignment, desiredAssignmentCount, subscribedTopicDescriber);
return groupAssignment(finalAssignment, groupSpec.memberIds());
}
@ -210,7 +211,8 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
private GroupAssignment newAssignmentHeterogeneous(
GroupSpec groupSpec,
Map<String, List<TopicIdPartition>> memberToPartitionsSubscription,
Map<TopicIdPartition, List<String>> currentAssignment
Map<TopicIdPartition, List<String>> currentAssignment,
SubscribedTopicDescriber subscribedTopicDescriber
) {
int numGroupMembers = groupSpec.memberIds().size();
@ -238,7 +240,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
});
unassignedPartitions.keySet().forEach(unassignedTopic ->
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment));
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment, subscribedTopicDescriber));
// Step 3: We combine current assignment and new assignment.
Map<String, Set<TopicIdPartition>> finalAssignment = newHashMap(numGroupMembers);
@ -282,16 +284,18 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
/**
* This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment.
* @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty.
* @param partitionsToAssign The subscribed topic partitions which needs assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
* @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty.
* @param partitionsToAssign The subscribed topic partitions which needs assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
* @param subscribedTopicDescriber The topic describer to fetch assignable partitions from.
*/
// Visible for testing
void roundRobinAssignment(
Collection<String> memberIds,
List<TopicIdPartition> partitionsToAssign,
Map<TopicIdPartition, List<String>> assignment
Map<TopicIdPartition, List<String>> assignment,
SubscribedTopicDescriber subscribedTopicDescriber
) {
// We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions),
// we again start from the starting index of memberIds.
@ -301,25 +305,29 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
memberIdIterator = memberIds.iterator();
}
String memberId = memberIdIterator.next();
assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId);
if (subscribedTopicDescriber.assignablePartitions(topicPartition.topicId()).contains(topicPartition.partitionId())) {
assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId);
}
}
}
/**
* This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment.
* @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty.
* @param partitionsToAssign The subscribed topic partitions which need assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
* @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance.
* Note that this number can be exceeded by one to allow for situations
* in which we have hashing collisions.
* @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty.
* @param partitionsToAssign The subscribed topic partitions which need assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
* @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance.
* Note that this number can be exceeded by one to allow for situations
* in which we have hashing collisions.
* @param subscribedTopicDescriber The topic describer to fetch assignable partitions from.
*/
void roundRobinAssignmentWithCount(
Collection<String> memberIds,
List<TopicIdPartition> partitionsToAssign,
Map<String, Set<TopicIdPartition>> assignment,
int desiredAssignmentCount
int desiredAssignmentCount,
SubscribedTopicDescriber subscribedTopicDescriber
) {
Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
@ -329,6 +337,9 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
ListIterator<TopicIdPartition> partitionListIterator = partitionsToAssign.listIterator();
while (partitionListIterator.hasNext()) {
TopicIdPartition partition = partitionListIterator.next();
if (!subscribedTopicDescriber.assignablePartitions(partition.topicId()).contains(partition.partitionId())) {
continue;
}
if (!memberIdIterator.hasNext()) {
memberIdIterator = memberIdsCopy.iterator();
if (memberIdsCopy.isEmpty()) {
@ -362,9 +373,13 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
);
}
for (int i = 0; i < numPartitions; i++) {
targetPartitions.add(new TopicIdPartition(topicId, i));
}
// Since we are returning a list here, we can keep it sorted
// to add determinism while testing and iterating.
targetPartitions.addAll(subscribedTopicDescriber.assignablePartitions(topicId).stream()
.sorted()
.map(partition -> new TopicIdPartition(topicId, partition))
.toList()
);
});
return targetPartitions;
}

View File

@ -23,6 +23,8 @@ import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain
@ -34,9 +36,15 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
* object, which contains topic and partition metadata.
*/
private final Map<Uuid, TopicMetadata> topicMetadata;
private final Map<Uuid, Set<Integer>> topicPartitionAllowedMap;
public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata) {
this(topicMetadata, null);
}
public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata, Map<Uuid, Set<Integer>> topicPartitionAllowedMap) {
this.topicMetadata = Objects.requireNonNull(topicMetadata);
this.topicPartitionAllowedMap = topicPartitionAllowedMap;
}
/**
@ -74,6 +82,29 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
return Set.of();
}
/**
* Returns a set of assignable partitions from the topic metadata.
* If the allowed partition map is null, all the partitions in the corresponding
* topic metadata are returned for the argument topic id. If allowed map is empty,
* empty set is returned.
*
* @param topicId The uuid of the topic
* @return Set of integers if assignable partitions available, empty otherwise.
*/
@Override
public Set<Integer> assignablePartitions(Uuid topicId) {
TopicMetadata topic = this.topicMetadata.get(topicId);
if (topic == null) {
return Set.of();
}
if (topicPartitionAllowedMap == null) {
return IntStream.range(0, topic.numPartitions()).boxed().collect(Collectors.toUnmodifiableSet());
}
return topicPartitionAllowedMap.getOrDefault(topicId, Set.of());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -287,6 +287,11 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
*/
private Map<String, String> staticMembers = new HashMap<>();
/**
* Topic partition assignable map.
*/
private Map<Uuid, Set<Integer>> topicAssignablePartitionsMap = new HashMap<>();
/**
* Constructs the object.
*
@ -395,6 +400,13 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
return self();
}
public U withTopicAssignablePartitionsMap(
Map<Uuid, Set<Integer>> topicAssignablePartitionsMap
) {
this.topicAssignablePartitionsMap = topicAssignablePartitionsMap;
return self();
}
/**
* Adds or updates a member. This is useful when the updated member is
* not yet materialized in memory.
@ -483,11 +495,10 @@ public abstract class TargetAssignmentBuilder<T extends ModernGroupMember, U ext
subscriptionType,
invertedTargetAssignment
),
new SubscribedTopicDescriberImpl(topicMetadataMap)
new SubscribedTopicDescriberImpl(topicMetadataMap, topicAssignablePartitionsMap)
);
// Compute delta from previous to new target assignment and create the
// relevant records.
// Compute delta from previous to new target assignment and create the relevant records.
List<CoordinatorRecord> records = new ArrayList<>();
for (String memberId : memberSpecs.keySet()) {

View File

@ -45,6 +45,8 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
@ -58,6 +60,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -283,6 +286,56 @@ public class GroupCoordinatorRecordHelpersTest {
));
}
@Test
public void testNewShareGroupPartitionMetadataRecord() {
String groupId = "group-id";
String topicName1 = "t1";
Uuid topicId1 = Uuid.randomUuid();
String topicName2 = "t2";
Uuid topicId2 = Uuid.randomUuid();
Set<Integer> partitions = new LinkedHashSet<>();
partitions.add(0);
partitions.add(1);
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(
List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(topicId1)
.setTopicName(topicName1)
.setPartitions(List.of(0, 1))
)
)
.setDeletingTopics(
List.of(
new ShareGroupStatePartitionMetadataValue.TopicInfo()
.setTopicId(topicId2)
.setTopicName(topicName2)
)
),
(short) 0
)
);
CoordinatorRecord record = GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
groupId,
Map.of(
topicId1,
Map.entry(topicName1, partitions)
),
Map.of(
topicId2,
topicName2
)
);
assertEquals(expectedRecord, record);
}
@Test
public void testNewConsumerGroupTargetAssignmentRecord() {
Uuid topicId1 = Uuid.randomUuid();

View File

@ -92,6 +92,8 @@ import org.apache.kafka.server.share.persister.DefaultStatePersister;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateResult;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateResult;
import org.apache.kafka.server.share.persister.NoOpStatePersister;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData;
@ -100,6 +102,7 @@ import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParamet
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryResult;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@ -115,6 +118,7 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
@ -2637,7 +2641,10 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(
new ShareGroupHeartbeatResponseData()
Map.entry(
new ShareGroupHeartbeatResponseData(),
Optional.empty()
)
));
CompletableFuture<ShareGroupHeartbeatResponseData> future = service.shareGroupHeartbeat(
@ -3090,6 +3097,212 @@ public class GroupCoordinatorServiceTest {
assertEquals(responseData, future.get());
}
@Test
public void testPersisterInitializeSuccess() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
String groupId = "share-group";
Uuid topicId = Uuid.randomUuid();
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId, "topic-name", 3)
.build();
service.onNewMetadataImage(image, null);
when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of(
new TopicData<>(topicId, List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())
))
)).build()
));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData();
InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId,
List.of(
new TopicData<>(topicId, List.of(PartitionFactory.newPartitionStateData(0, 0, -1)))
))
)
.build();
assertEquals(defaultResponse, service.persisterInitialize(params, defaultResponse).getNow(null));
verify(runtime, times(1)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
}
@Test
public void testPersisterInitializeFailure() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
String groupId = "share-group";
Uuid topicId = Uuid.randomUuid();
Exception exp = new NotCoordinatorException("bad stuff");
when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.failedFuture(exp));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData();
InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId,
List.of(
new TopicData<>(topicId, List.of(PartitionFactory.newPartitionStateData(0, 0, -1)))
))
).build();
assertEquals(Errors.forException(exp).code(), service.persisterInitialize(params, defaultResponse).getNow(null).errorCode());
verify(runtime, times(0)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
}
@Test
public void testPersisterInitializePartialFailure() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
String groupId = "share-group";
Uuid topicId = Uuid.randomUuid();
when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of(
new TopicData<>(topicId, List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()),
PartitionFactory.newPartitionErrorData(1, Errors.TOPIC_AUTHORIZATION_FAILED.code(), Errors.TOPIC_AUTHORIZATION_FAILED.message())
))
)).build()
));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(null));
ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData();
InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId,
List.of(
new TopicData<>(topicId, List.of(
PartitionFactory.newPartitionStateData(0, 0, -1),
PartitionFactory.newPartitionStateData(1, 0, -1)
))
))
).build();
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), service.persisterInitialize(params, defaultResponse).getNow(null).errorCode());
verify(runtime, times(0)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
}
@Test
public void testPersisterInitializeGroupInitializeFailure() {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = mockRuntime();
Persister mockPersister = mock(Persister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
.setPersister(mockPersister)
.build(true);
String groupId = "share-group";
Uuid topicId = Uuid.randomUuid();
Exception exp = new CoordinatorNotAvailableException("bad stuff");
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId, "topic-name", 3)
.build();
service.onNewMetadataImage(image, null);
when(mockPersister.initializeState(any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
.setTopicsData(List.of(
new TopicData<>(topicId, List.of(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())
))
)).build()
));
when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.failedFuture(exp));
ShareGroupHeartbeatResponseData defaultResponse = new ShareGroupHeartbeatResponseData();
InitializeShareGroupStateParameters params = new InitializeShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(
new GroupTopicPartitionData<>(groupId,
List.of(
new TopicData<>(topicId, List.of(
PartitionFactory.newPartitionStateData(0, 0, -1)
))
))
).build();
assertEquals(Errors.forException(exp).code(), service.persisterInitialize(params, defaultResponse).getNow(null).errorCode());
verify(runtime, times(1)).scheduleWriteOperation(
ArgumentMatchers.eq("initialize-share-group-state"),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
verify(mockPersister, times(1)).initializeState(ArgumentMatchers.any());
}
@FunctionalInterface
private interface TriFunction<A, B, C, R> {
R apply(A a, B b, C c);
@ -3119,7 +3332,8 @@ public class GroupCoordinatorServiceTest {
runtime,
metrics,
configManager,
persister
persister,
new MockTimer()
);
if (serviceStartup) {

View File

@ -88,6 +88,7 @@ import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData;
import org.apache.kafka.server.share.persister.TopicData;
@ -1609,9 +1610,12 @@ public class GroupCoordinatorShardTest {
RequestContext context = requestContext(ApiKeys.SHARE_GROUP_HEARTBEAT);
ShareGroupHeartbeatRequestData request = new ShareGroupHeartbeatRequestData();
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = new CoordinatorResult<>(
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = new CoordinatorResult<>(
List.of(),
new ShareGroupHeartbeatResponseData()
Map.entry(
new ShareGroupHeartbeatResponseData(),
Optional.empty()
)
);
when(groupMetadataManager.shareGroupHeartbeat(

View File

@ -81,6 +81,10 @@ import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue.Endpoint;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.modern.Assignment;
@ -112,8 +116,10 @@ import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PartitionIdData;
import org.apache.kafka.server.share.persister.PartitionStateData;
import org.apache.kafka.server.share.persister.TopicData;
import org.junit.jupiter.api.Test;
@ -126,6 +132,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -153,6 +160,7 @@ import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
import static org.apache.kafka.coordinator.group.GroupMetadataManager.classicGroupHeartbeatKey;
@ -3290,14 +3298,14 @@ public class GroupMetadataManagerTest {
));
// Session timer is scheduled on first heartbeat.
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result =
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result =
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo")));
assertEquals(1, result.response().memberEpoch());
assertEquals(1, result.response().getKey().memberEpoch());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@ -14890,18 +14898,41 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(0), 100));
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupIds.get(1), 15));
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
Uuid topicId = Uuid.randomUuid();
String topicName = "foo";
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId, topicName, 1)
.build();
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupIds.get(1))
.setMemberId(Uuid.randomUuid().toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo")));
.setSubscribedTopicNames(List.of(topicName)));
// Verify that a member id was generated for the new member.
String memberId = result.response().memberId();
String memberId = result.response().getKey().memberId();
assertNotNull(memberId);
context.commit();
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
topicId,
Set.of(0)
),
groupIds.get(1),
16,
true
);
List<ShareGroupDescribeResponseData.DescribedGroup> expected = List.of(
new ShareGroupDescribeResponseData.DescribedGroup()
.setGroupEpoch(100)
@ -14917,7 +14948,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(16)
.setClientId("client")
.setClientHost("localhost/127.0.0.1")
.setSubscribedTopicNames(List.of("foo"))
.setSubscribedTopicNames(List.of(topicName))
.build()
.asShareGroupDescribeMember(
new MetadataImageBuilder().build().topics()
@ -14944,16 +14975,46 @@ public class GroupMetadataManagerTest {
));
String memberId = Uuid.randomUuid().toString();
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
Uuid topicId1 = Uuid.randomUuid();
String topicName1 = "foo";
Uuid topicId2 = Uuid.randomUuid();
String topicName2 = "bar";
String groupId = "group-foo";
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 1)
.addTopic(topicId2, topicName2, 1)
.build();
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId("group-foo")
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo", "bar")));
.setSubscribedTopicNames(List.of(topicName1, topicName2)));
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
topicId1,
Set.of(0),
topicId2,
Set.of(0)
),
groupId,
1,
true
);
assertEquals(
memberId,
result.response().memberId(),
result.response().getKey().memberId(),
"MemberId should remain unchanged, as the server does not generate a new one since the consumer generates its own."
);
@ -14966,7 +15027,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(1)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ShareGroupHeartbeatResponseData.Assignment()),
result.response()
result.response().getKey()
);
}
@ -14998,13 +15059,42 @@ public class GroupMetadataManagerTest {
.withShareGroupAssignor(new NoOpPartitionAssignor())
.build();
Uuid topicId1 = Uuid.randomUuid();
String topicName1 = "foo";
Uuid topicId2 = Uuid.randomUuid();
String topicName2 = "bar";
MetadataImage image = new MetadataImageBuilder()
.addTopic(topicId1, topicName1, 1)
.addTopic(topicId2, topicName2, 1)
.build();
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
// A first member joins to create the group.
context.shareGroupHeartbeat(
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo", "bar")));
.setSubscribedTopicNames(List.of(topicName1, topicName2)));
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
topicId1,
Set.of(0),
topicId2,
Set.of(0)
),
groupId,
1,
true
);
// The second member is rejected because the member id is unknown and
// the member epoch is not zero.
@ -15047,13 +15137,37 @@ public class GroupMetadataManagerTest {
assertThrows(GroupIdNotFoundException.class, () ->
context.groupMetadataManager.shareGroup(groupId));
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
MetadataImage image = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build();
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo", "bar")));
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
fooTopicId,
Set.of(0, 1, 2, 3, 4, 5),
barTopicId,
Set.of(0, 1, 2)
),
groupId,
1,
true
);
assertResponseEquals(
new ShareGroupHeartbeatResponseData()
.setMemberId(memberId)
@ -15068,7 +15182,7 @@ public class GroupMetadataManagerTest {
.setTopicId(barTopicId)
.setPartitions(List.of(0, 1, 2))
))),
result.response()
result.response().getKey()
);
ShareGroupMember expectedMember = new ShareGroupMember.Builder(memberId)
@ -15158,18 +15272,26 @@ public class GroupMetadataManagerTest {
.build();
// Member 2 leaves the consumer group.
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setSubscribedTopicNames(List.of("foo", "bar")));
.setSubscribedTopicNames(List.of(fooTopicName, barTopicName)));
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(),
"",
-1,
false
);
assertResponseEquals(
new ShareGroupHeartbeatResponseData()
.setMemberId(memberId2)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
result.response()
result.response().getKey()
);
List<CoordinatorRecord> expectedRecords = List.of(
@ -15208,14 +15330,43 @@ public class GroupMetadataManagerTest {
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 100));
Uuid fooTopicId = Uuid.randomUuid();
String fooTopicName = "foo";
Uuid barTopicId = Uuid.randomUuid();
String barTopicName = "bar";
MetadataImage image = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 1)
.addTopic(barTopicId, barTopicName, 1)
.build();
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
// Member 1 joins the group.
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = context.shareGroupHeartbeat(
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo", "bar")));
assertEquals(101, result.response().memberEpoch());
.setSubscribedTopicNames(List.of(fooTopicName, barTopicName)));
assertEquals(101, result.response().getKey().memberEpoch());
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
fooTopicId,
Set.of(0),
barTopicId,
Set.of(0)
),
groupId,
101,
true
);
// Member 2 joins the group.
assertThrows(GroupMaxSizeReachedException.class, () -> context.shareGroupHeartbeat(
@ -15404,18 +15555,39 @@ public class GroupMetadataManagerTest {
)))
));
MetadataImage image = new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build();
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
// Session timer is scheduled on first heartbeat.
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result =
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result =
context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo")));
assertEquals(1, result.response().memberEpoch());
assertEquals(1, result.response().getKey().memberEpoch());
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
fooTopicId,
Set.of(0, 1, 2, 3, 4, 5)
),
groupId,
1,
true
);
// Verify heartbeat interval
assertEquals(5000, result.response().heartbeatIntervalMs());
assertEquals(5000, result.response().getKey().heartbeatIntervalMs());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 45000);
@ -15423,7 +15595,7 @@ public class GroupMetadataManagerTest {
// Advance time.
assertEquals(
List.of(),
context.sleep(result.response().heartbeatIntervalMs())
context.sleep(result.response().getKey().heartbeatIntervalMs())
);
// Dynamic update group config
@ -15432,16 +15604,37 @@ public class GroupMetadataManagerTest {
newGroupConfig.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
context.updateGroupConfig(groupId, newGroupConfig);
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(fooTopicId)
.setTopicName(fooTopicName)
.setPartitions(List.of(0, 1, 2, 3, 4, 5))
))
.setDeletingTopics(List.of())
);
// Session timer is rescheduled on second heartbeat.
result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(result.response().memberEpoch()));
assertEquals(1, result.response().memberEpoch());
.setMemberEpoch(result.response().getKey().memberEpoch()));
assertEquals(1, result.response().getKey().memberEpoch());
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(),
"",
0,
false
);
// Verify heartbeat interval
assertEquals(10000, result.response().heartbeatIntervalMs());
assertEquals(10000, result.response().getKey().heartbeatIntervalMs());
// Verify that there is a session time.
context.assertSessionTimeout(groupId, memberId, 50000);
@ -15449,7 +15642,7 @@ public class GroupMetadataManagerTest {
// Advance time.
assertEquals(
List.of(),
context.sleep(result.response().heartbeatIntervalMs())
context.sleep(result.response().getKey().heartbeatIntervalMs())
);
// Session timer is cancelled on leave.
@ -15458,7 +15651,15 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().memberEpoch());
assertEquals(LEAVE_GROUP_MEMBER_EPOCH, result.response().getKey().memberEpoch());
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(),
"",
0,
false
);
// Verify that there are no timers.
context.assertNoSessionTimeout(groupId, memberId);
@ -17488,6 +17689,287 @@ public class GroupMetadataManagerTest {
verify(shareGroup, times(1)).groupId();
}
@Test
public void testShareGroupHeartbeatInitializeOnPartitionUpdate() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
Uuid t1Uuid = Uuid.randomUuid();
String t1Name = "t1";
Uuid t2Uuid = Uuid.randomUuid();
String t2Name = "t2";
MetadataImage image = new MetadataImageBuilder()
.addTopic(t1Uuid, "t1", 2)
.addTopic(t2Uuid, "t2", 2)
.build();
String groupId = "share-group";
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
Uuid memberId = Uuid.randomUuid();
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of(t1Name, t2Name)));
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
t1Uuid,
Set.of(0, 1),
t2Uuid,
Set.of(0, 1)
),
groupId,
1,
true
);
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Uuid)
.setTopicName("t1")
.setPartitions(List.of(0, 1)),
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t2Uuid)
.setTopicName("t2")
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
);
// Partition increase
image = new MetadataImageBuilder()
.addTopic(t1Uuid, "t1", 4)
.addTopic(t2Uuid, "t2", 2)
.build();
context.groupMetadataManager.onNewMetadataImage(image, mock(MetadataDelta.class));
assignor.prepareGroupAssignment(new GroupAssignment(
Map.of(
memberId.toString(),
new MemberAssignmentImpl(
Map.of(
t1Uuid,
Set.of(0, 1, 2, 3),
t2Uuid,
Set.of(0, 1)
)
)
)
));
result = context.shareGroupHeartbeat(
new ShareGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId.toString())
.setMemberEpoch(1)
.setSubscribedTopicNames(null));
verifyShareGroupHeartbeatInitializeRequest(
result.response().getValue(),
Map.of(
t1Uuid,
Set.of(2, 3)
),
groupId,
2,
true
);
}
@Test
public void testShareGroupInitializeSuccess() {
String groupId = "groupId";
Uuid topicId = Uuid.randomUuid();
String topicName = "t1";
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.withMetadataImage(new MetadataImageBuilder()
.addTopic(topicId, topicName, 2)
.build()
)
.build();
context.groupMetadataManager.replay(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ShareGroupMetadataValue()
.setEpoch(0)
);
Map<Uuid, Set<Integer>> snapshotMetadataInitializeMap = Map.of(
topicId,
Set.of(0, 1)
);
Map<Uuid, Map.Entry<String, Set<Integer>>> snapshotMetadataInitializeRecordMap = Map.of(
topicId,
Map.entry(
topicName,
Set.of(0, 1)
)
);
CoordinatorResult<Void, CoordinatorRecord> result = context.groupMetadataManager.initializeShareGroupState(groupId, snapshotMetadataInitializeMap);
CoordinatorRecord record = newShareGroupStatePartitionMetadataRecord(groupId, snapshotMetadataInitializeRecordMap, Map.of());
assertNull(result.response());
assertEquals(List.of(record), result.records());
}
@Test
public void testShareGroupInitializeEmptyMap() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
String groupId = "groupId";
context.groupMetadataManager.replay(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ShareGroupMetadataValue()
.setEpoch(0)
);
CoordinatorResult<Void, CoordinatorRecord> result = context.groupMetadataManager.initializeShareGroupState(groupId, Map.of());
assertNull(result.response());
assertEquals(List.of(), result.records());
result = context.groupMetadataManager.initializeShareGroupState(groupId, null);
assertNull(result.response());
assertEquals(List.of(), result.records());
}
@Test
public void testSubscribedTopicsChangeMap() {
String topicName = "foo";
Uuid topicId = Uuid.randomUuid();
int partitions = 1;
String groupId = "foogrp";
MockPartitionAssignor assignor = new MockPartitionAssignor("simple");
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withShareGroupAssignor(assignor)
.build();
// Empty on empty metadata image
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
assertEquals(
Map.of(),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of(
topicName, new TopicMetadata(topicId, topicName, partitions)
))
);
// Empty on empty subscription metadata
image = new MetadataImageBuilder()
.addTopic(topicId, topicName, partitions)
.build();
delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
assertEquals(
Map.of(),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of())
);
// No error on empty initialized metadata (no replay of initialized topics)
image = new MetadataImageBuilder()
.addTopic(topicId, topicName, partitions)
.build();
delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
assertEquals(
Map.of(
topicId, Map.entry(
topicName,
Set.of(0)
)
),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of(
topicName, new TopicMetadata(topicId, topicName, partitions)
))
);
// Calculates correct diff
String t1Name = "t1";
Uuid t1Id = Uuid.randomUuid();
String t2Name = "t2";
Uuid t2Id = Uuid.randomUuid();
image = new MetadataImageBuilder()
.addTopic(t1Id, t1Name, 2)
.addTopic(t2Id, t2Name, 2)
.build();
delta = new MetadataDelta.Builder()
.setImage(image)
.build();
context.groupMetadataManager.onNewMetadataImage(image, delta);
context.groupMetadataManager.replay(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ShareGroupMetadataValue()
.setEpoch(0)
);
context.groupMetadataManager.replay(
new ShareGroupStatePartitionMetadataKey()
.setGroupId(groupId),
new ShareGroupStatePartitionMetadataValue()
.setInitializedTopics(List.of(
new ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
.setTopicId(t1Id)
.setTopicName(t1Name)
.setPartitions(List.of(0, 1))
))
.setDeletingTopics(List.of())
);
// Since t1 is already initialized due to replay above
assertEquals(
Map.of(
t2Id, Map.entry(
t2Name,
Set.of(0, 1)
)
),
context.groupMetadataManager.subscribedTopicsChangeMap(groupId, Map.of(
t1Name, new TopicMetadata(t1Id, t1Name, 2),
t2Name, new TopicMetadata(t2Id, t2Name, 2)
))
);
}
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,
@ -17546,4 +18028,30 @@ public class GroupMetadataManagerTest {
assertEquals(expectedSuccessCount, successCount);
return memberIds;
}
private void verifyShareGroupHeartbeatInitializeRequest(
Optional<InitializeShareGroupStateParameters> initRequest,
Map<Uuid, Set<Integer>> expectedTopicPartitionsMap,
String groupId,
int stateEpoch,
boolean shouldExist
) {
if (shouldExist) {
assertTrue(initRequest.isPresent());
InitializeShareGroupStateParameters request = initRequest.get();
assertEquals(groupId, request.groupTopicPartitionData().groupId());
Map<Uuid, Set<Integer>> actualTopicPartitionsMap = new HashMap<>();
for (TopicData<PartitionStateData> topicData : request.groupTopicPartitionData().topicsData()) {
actualTopicPartitionsMap.computeIfAbsent(topicData.topicId(), k -> new HashSet<>())
.addAll(topicData.partitions().stream().map(partitionData -> {
assertEquals(stateEpoch, partitionData.stateEpoch());
assertEquals(-1, partitionData.startOffset());
return partitionData.partition();
}).toList());
}
assertEquals(expectedTopicPartitionsMap, actualTopicPartitionsMap);
} else {
assertTrue(initRequest.isEmpty());
}
}
}

View File

@ -113,6 +113,7 @@ import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.timeline.SnapshotRegistry;
import java.net.InetAddress;
@ -653,7 +654,7 @@ public class GroupMetadataManagerTestContext {
return result;
}
public CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(
public CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat(
ShareGroupHeartbeatRequestData request
) {
RequestContext context = new RequestContext(
@ -672,10 +673,11 @@ public class GroupMetadataManagerTestContext {
false
);
CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = groupMetadataManager.shareGroupHeartbeat(
context,
request
);
CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result =
groupMetadataManager.shareGroupHeartbeat(
context,
request
);
result.records().forEach(this::replay);
return result;

View File

@ -21,6 +21,7 @@ import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
@ -45,6 +46,9 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class SimpleAssignorTest {
@ -414,7 +418,13 @@ public class SimpleAssignorTest {
Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
assignment.put(partition1, List.of(member1));
assignor.roundRobinAssignment(members, unassignedPartitions, assignment);
SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class);
when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0));
assignor.roundRobinAssignment(members, unassignedPartitions, assignment, describer);
Map<TopicIdPartition, List<String>> expectedAssignment = Map.of(
partition1, List.of(member1),
partition2, List.of(member1),
@ -440,7 +450,13 @@ public class SimpleAssignorTest {
assignment.put(member1, new HashSet<>(Set.of(partition1)));
assignment.put(member2, new HashSet<>(Set.of(partition1)));
assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2);
SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class);
when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0));
assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2, describer);
Map<String, Set<TopicIdPartition>> expectedAssignment = Map.of(
member1, Set.of(partition1, partition2, partition4),
member2, Set.of(partition1, partition3)
@ -466,8 +482,14 @@ public class SimpleAssignorTest {
assignment.put(member1, new HashSet<>(Set.of(partition1)));
assignment.put(member2, new HashSet<>(Set.of(partition1)));
SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class);
when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0));
when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0, 1, 2));
assertThrows(PartitionAssignorException.class,
() -> assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2));
() -> assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2, describer));
}
@Test

View File

@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@ -79,4 +80,32 @@ public class SubscribedTopicMetadataTest {
topicMetadataMap2.put(topicId, new TopicMetadata(topicId, "newTopic", 5));
assertNotEquals(new SubscribedTopicDescriberImpl(topicMetadataMap2), subscribedTopicMetadata);
}
@Test
public void testAssignablePartitions() {
// null allow map (all partitions assignable)
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap, null);
String t1Name = "t1";
Uuid t1Id = Uuid.randomUuid();
topicMetadataMap.put(t1Id, new TopicMetadata(t1Id, t1Name, 5));
assertEquals(Set.of(0, 1, 2, 3, 4), subscribedTopicMetadata.assignablePartitions(t1Id));
// empty allow map (nothing assignable)
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadataMap, Map.of());
assertEquals(Set.of(), subscribedTopicMetadata.assignablePartitions(t1Id));
// few assignable partitions
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
topicMetadataMap,
Map.of(t1Id, Set.of(0, 5))
);
assertEquals(Set.of(0, 5), subscribedTopicMetadata.assignablePartitions(t1Id));
// all assignable partitions
subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
topicMetadataMap,
Map.of(t1Id, Set.of(0, 1, 2, 3, 4))
);
assertEquals(Set.of(0, 1, 2, 3, 4), subscribedTopicMetadata.assignablePartitions(t1Id));
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.server.share.persister;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import java.util.stream.Collectors;
/**
* This class contains the parameters for {@link Persister#initializeState(InitializeShareGroupStateParameters)}.
@ -36,14 +35,12 @@ public class InitializeShareGroupStateParameters implements PersisterParameters
}
public static InitializeShareGroupStateParameters from(InitializeShareGroupStateRequestData data) {
return new Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData<>(data.groupId(), data.topics().stream()
.map(readStateData -> new TopicData<>(readStateData.topicId(),
readStateData.partitions().stream()
.map(partitionData -> PartitionFactory.newPartitionStateData(partitionData.partition(), partitionData.stateEpoch(), partitionData.startOffset()))
.collect(Collectors.toList())))
.collect(Collectors.toList())))
.build();
return new Builder().setGroupTopicPartitionData(new GroupTopicPartitionData<>(data.groupId(), data.topics().stream()
.map(readStateData -> new TopicData<>(readStateData.topicId(),
readStateData.partitions().stream()
.map(partitionData -> PartitionFactory.newPartitionStateData(partitionData.partition(), partitionData.stateEpoch(), partitionData.startOffset())).toList()
)).toList()
)).build();
}
public static class Builder {
@ -58,4 +55,11 @@ public class InitializeShareGroupStateParameters implements PersisterParameters
return new InitializeShareGroupStateParameters(this.groupTopicPartitionData);
}
}
@Override
public String toString() {
return "InitializeShareGroupStateParameters{" +
"groupTopicPartitionData=" + groupTopicPartitionData +
'}';
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools.consumer.group.share;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordJsonConverters;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Set;
public class ShareGroupStatePartitionMetadataFormatter extends CoordinatorRecordMessageFormatter {
private static final Set<Short> ALLOWED_RECORDS = Set.of(
CoordinatorRecordType.SHARE_GROUP_STATE_PARTITION_METADATA.id()
);
public ShareGroupStatePartitionMetadataFormatter() {
super(new GroupCoordinatorRecordSerde());
}
@Override
protected boolean isRecordTypeAllowed(short recordType) {
return ALLOWED_RECORDS.contains(recordType);
}
@Override
protected JsonNode keyAsJson(ApiMessage message) {
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
}
@Override
protected JsonNode valueAsJson(ApiMessage message, short version) {
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
}
}