KAFKA-19565: Integration test for Streams-related Admin APIs [2/N] (#20266)

Integration tests for Stream Admin related API

Previous PR: https://github.com/apache/kafka/pull/20244

This one adds:
- Integration test for Admin#listStreamsGroupOffsets API
- Integration test for Admin#deleteStreamsGroupOffsets API
- Integration test for Admin#alterStreamsGroupOffsets API

Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Lucas Brutschy
 <lucasbru@apache.org>
This commit is contained in:
lucliu1108 2025-09-09 03:30:39 -05:00 committed by GitHub
parent 9c9f1446a1
commit f6f5b4cb27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 213 additions and 0 deletions

View File

@ -4521,6 +4521,219 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
Utils.closeQuietly(client, "adminClient")
}
}
@Test
def testListStreamsGroupOffsets(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val testNumPartitions = 3
val config = createConfig
client = Admin.create(config)
val producer = createProducer(configOverrides = new Properties())
prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)
// Producer sends messages
for (i <- 1 to 20) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
.get()
producerRecord != null && producerRecord.topic() == testTopicName
}, "Fail to produce record to topic")
}
val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
TestUtils.waitUntilTrue(() => {
val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null)
firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId
}, "Stream group not stable yet")
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
assertNotNull(allTopicPartitions)
assertEquals(allTopicPartitions.size(), 3)
allTopicPartitions.forEach((topicPartition, offsetAndMetadata) => {
assertNotNull(topicPartition)
assertNotNull(offsetAndMetadata)
assertTrue(topicPartition.topic().startsWith(testTopicName))
assertTrue(offsetAndMetadata.offset() >= 0)
})
} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
Utils.closeQuietly(producer, "producer")
}
}
@Test
def testDeleteStreamsGroupOffsets(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val testNumPartitions = 3
val config = createConfig
client = Admin.create(config)
val producer = createProducer(configOverrides = new Properties())
prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)
// Producer sends messages
for (i <- 1 to 20) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
.get()
producerRecord != null && producerRecord.topic() == testTopicName
}, "Fail to produce record to topic")
}
val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
// List streams group offsets
TestUtils.waitUntilTrue(() => {
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
allTopicPartitions!=null && allTopicPartitions.size() == testNumPartitions
},"Streams group offsets not ready to list yet")
// Verify running Kstreams group cannot delete its own offsets
var deleteStreamsGroupOffsetsResult = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, 0)))
assertFutureThrows(classOf[GroupSubscribedToTopicException], deleteStreamsGroupOffsetsResult.all())
// Verity stopped Kstreams group can delete its own offsets
streams.close()
TestUtils.waitUntilTrue(() => {
val groupDescription = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
groupDescription.get(streamsGroupId).groupState() == GroupState.EMPTY
}, "Streams group not closed yet")
deleteStreamsGroupOffsetsResult = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, 0)))
val res = deleteStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 0)).get()
assertNull(res)
// Verify the group offsets after deletion
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
assertEquals(testNumPartitions-1, allTopicPartitions.size())
// Verify non-existing topic partition couldn't be deleted
val deleteStreamsGroupOffsetsResultWithFakeTopic = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition("mock-topic", 1)))
assertFutureThrows(classOf[UnknownTopicOrPartitionException], deleteStreamsGroupOffsetsResultWithFakeTopic.all())
val deleteStreamsGroupOffsetsResultWithFakePartition = client.deleteStreamsGroupOffsets(streamsGroupId, util.Set.of(new TopicPartition(testTopicName, testNumPartitions)))
assertFutureThrows(classOf[UnknownTopicOrPartitionException], deleteStreamsGroupOffsetsResultWithFakePartition.all())
} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
Utils.closeQuietly(producer, "producer")
}
}
@Test
def testAlterStreamsGroupOffsets(): Unit = {
val streamsGroupId = "stream_group_id"
val testTopicName = "test_topic"
val testNumPartitions = 3
val config = createConfig
client = Admin.create(config)
val producer = createProducer(configOverrides = new Properties())
prepareTopics(List(testTopicName), testNumPartitions)
prepareRecords(testTopicName)
// Producer sends messages
for (i <- 1 to 20) {
TestUtils.waitUntilTrue(() => {
val producerRecord = producer.send(
new ProducerRecord[Array[Byte], Array[Byte]](testTopicName, s"key-$i".getBytes(), s"value-$i".getBytes()))
.get()
producerRecord != null && producerRecord.topic() == testTopicName
}, "Fail to produce record to topic")
}
val streams = createStreamsGroup(
inputTopic = testTopicName,
streamsGroupId = streamsGroupId,
)
try {
TestUtils.waitUntilTrue(() => {
streams.poll(JDuration.ofMillis(100L))
!streams.assignment().isEmpty
}, "Consumer not assigned to partitions")
streams.poll(JDuration.ofMillis(1000L))
streams.commitSync()
// List streams group offsets
TestUtils.waitUntilTrue(() => {
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
allTopicPartitions!=null && allTopicPartitions.size() == testNumPartitions
},"Streams group offsets not ready to list yet")
// Verity stopped Kstreams group can delete its own offsets
streams.close()
TestUtils.waitUntilTrue(() => {
val groupDescription = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
groupDescription.get(streamsGroupId).groupState() == GroupState.EMPTY
}, "Streams group not closed yet")
val offsets = util.Map.of(
new TopicPartition(testTopicName, 0), new OffsetAndMetadata(1L),
new TopicPartition(testTopicName, 1), new OffsetAndMetadata(10L)
)
val alterStreamsGroupOffsetsResult = client.alterStreamsGroupOffsets(streamsGroupId, offsets)
val res0 = alterStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 0)).get()
val res1 = alterStreamsGroupOffsetsResult.partitionResult(new TopicPartition(testTopicName, 1)).get()
assertTrue(res0 == null && res1 == null, "Alter streams group offsets should return null for each partition result")
val allTopicPartitions = client.listStreamsGroupOffsets(
util.Map.of(streamsGroupId, new ListStreamsGroupOffsetsSpec())
).partitionsToOffsetAndMetadata(streamsGroupId).get()
assertNotNull(allTopicPartitions)
assertEquals(testNumPartitions, allTopicPartitions.size())
assertEquals(1L, allTopicPartitions.get(new TopicPartition(testTopicName, 0)).offset())
assertEquals(10L, allTopicPartitions.get(new TopicPartition(testTopicName, 1)).offset())
} finally {
Utils.closeQuietly(streams, "streams")
Utils.closeQuietly(client, "adminClient")
Utils.closeQuietly(producer, "producer")
}
}
}
object PlaintextAdminIntegrationTest {