KAFKA-14367; Add `OffsetDelete` to the new `GroupCoordinator` interface (#12902)

This patch adds `OffsetDelete` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
David Jacot 2023-01-17 20:39:01 +01:00 committed by GitHub
parent b2bc72dc79
commit 700947aa5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 497 additions and 93 deletions

View File

@ -17,13 +17,17 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
/**
* Possible error codes:
@ -44,6 +48,83 @@ import java.util.Map;
*/
public class OffsetDeleteResponse extends AbstractResponse {
public static class Builder {
OffsetDeleteResponseData data = new OffsetDeleteResponseData();
private OffsetDeleteResponseTopic getOrCreateTopic(
String topicName
) {
OffsetDeleteResponseTopic topic = data.topics().find(topicName);
if (topic == null) {
topic = new OffsetDeleteResponseTopic().setName(topicName);
data.topics().add(topic);
}
return topic;
}
public Builder addPartition(
String topicName,
int partitionIndex,
Errors error
) {
final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName);
topicResponse.partitions().add(new OffsetDeleteResponsePartition()
.setPartitionIndex(partitionIndex)
.setErrorCode(error.code()));
return this;
}
public <P> Builder addPartitions(
String topicName,
List<P> partitions,
Function<P, Integer> partitionIndex,
Errors error
) {
final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName);
partitions.forEach(partition -> {
topicResponse.partitions().add(new OffsetDeleteResponsePartition()
.setPartitionIndex(partitionIndex.apply(partition))
.setErrorCode(error.code()));
});
return this;
}
public Builder merge(
OffsetDeleteResponseData newData
) {
if (data.topics().isEmpty()) {
// If the current data is empty, we can discard it and use the new data.
data = newData;
} else {
// Otherwise, we have to merge them together.
newData.topics().forEach(newTopic -> {
OffsetDeleteResponseTopic existingTopic = data.topics().find(newTopic.name());
if (existingTopic == null) {
// If no topic exists, we can directly copy the new topic data.
data.topics().add(newTopic.duplicate());
} else {
// Otherwise, we add the partitions to the existing one. Note we
// expect non-overlapping partitions here as we don't verify
// if the partition is already in the list before adding it.
newTopic.partitions().forEach(partition -> {
existingTopic.partitions().add(partition.duplicate());
});
}
});
}
return this;
}
public OffsetDeleteResponse build() {
return new OffsetDeleteResponse(data);
}
}
private final OffsetDeleteResponseData data;
public OffsetDeleteResponse(OffsetDeleteResponseData data) {

View File

@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata
import kafka.server.RequestLocal
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext}
@ -470,4 +470,45 @@ class GroupCoordinatorAdapter(
expireTimestamp = expireTimestamp
)
}
override def deleteOffsets(
context: RequestContext,
request: OffsetDeleteRequestData,
bufferSupplier: BufferSupplier
): CompletableFuture[OffsetDeleteResponseData] = {
val future = new CompletableFuture[OffsetDeleteResponseData]()
val partitions = mutable.ArrayBuffer[TopicPartition]()
request.topics.forEach { topic =>
topic.partitions.forEach { partition =>
partitions += new TopicPartition(topic.name, partition.partitionIndex)
}
}
val (groupError, topicPartitionResults) = coordinator.handleDeleteOffsets(
request.groupId,
partitions,
RequestLocal(bufferSupplier)
)
if (groupError != Errors.NONE) {
future.completeExceptionally(groupError.exception)
} else {
val response = new OffsetDeleteResponseData()
topicPartitionResults.forKeyValue { (topicPartition, error) =>
var topic = response.topics.find(topicPartition.topic)
if (topic == null) {
topic = new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topicPartition.topic)
response.topics.add(topic)
}
topic.partitions.add(new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(topicPartition.partition)
.setErrorCode(error.code))
}
future.complete(response)
}
future
}
}

View File

@ -225,7 +225,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal).exceptionally(handleError)
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
@ -3163,61 +3163,69 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
def handleOffsetDeleteRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
def handleOffsetDeleteRequest(
request: RequestChannel.Request,
requestLocal: RequestLocal
): CompletableFuture[Unit] = {
val offsetDeleteRequest = request.body[OffsetDeleteRequest]
val groupId = offsetDeleteRequest.data.groupId
if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
val topics = offsetDeleteRequest.data.topics.asScala
val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, topics)(_.name)
if (!authHelper.authorize(request.context, DELETE, GROUP, offsetDeleteRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request, offsetDeleteRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val authorizedTopics = authHelper.filterByAuthorized(
request.context,
READ,
TOPIC,
offsetDeleteRequest.data.topics.asScala
)(_.name)
val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
val responseBuilder = new OffsetDeleteResponse.Builder
val authorizedTopicPartitions = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection()
offsetDeleteRequest.data.topics.forEach { topic =>
if (!authorizedTopics.contains(topic.name)) {
// If the topic is not authorized, we add the topic and all its partitions
// to the response with TOPIC_AUTHORIZATION_FAILED.
responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED)
} else if (!metadataCache.contains(topic.name)) {
// If the topic is unknown, we add the topic and all its partitions
// to the response with UNKNOWN_TOPIC_OR_PARTITION.
responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
} else {
// Otherwise, we check all partitions to ensure that they all exist.
val topicWithValidPartitions = new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName(topic.name)
for (topic <- topics) {
for (partition <- topic.partitions.asScala) {
val tp = new TopicPartition(topic.name, partition.partitionIndex)
if (!authorizedTopics.contains(topic.name))
topicPartitionErrors(tp) = Errors.TOPIC_AUTHORIZATION_FAILED
else if (!metadataCache.contains(tp))
topicPartitionErrors(tp) = Errors.UNKNOWN_TOPIC_OR_PARTITION
else
topicPartitions += tp
topic.partitions.forEach { partition =>
if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) {
topicWithValidPartitions.partitions.add(partition)
} else {
responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
}
if (!topicWithValidPartitions.partitions.isEmpty) {
authorizedTopicPartitions.add(topicWithValidPartitions)
}
}
}
val (groupError, authorizedTopicPartitionsErrors) = groupCoordinator.handleDeleteOffsets(
groupId, topicPartitions, requestLocal)
val offsetDeleteRequestData = new OffsetDeleteRequestData()
.setGroupId(offsetDeleteRequest.data.groupId)
.setTopics(authorizedTopicPartitions)
topicPartitionErrors ++= authorizedTopicPartitionsErrors
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
if (groupError != Errors.NONE)
offsetDeleteRequest.getErrorResponse(requestThrottleMs, groupError)
else {
val topics = new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection
topicPartitionErrors.groupBy(_._1.topic).forKeyValue { (topic, topicPartitions) =>
val partitions = new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection
topicPartitions.forKeyValue { (topicPartition, error) =>
partitions.add(
new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(topicPartition.partition)
.setErrorCode(error.code)
)
}
topics.add(new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
.setName(topic)
.setPartitions(partitions))
}
new OffsetDeleteResponse(new OffsetDeleteResponseData()
.setTopics(topics)
.setThrottleTimeMs(requestThrottleMs))
newGroupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequestData,
requestLocal.bufferSupplier
).handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, offsetDeleteRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, responseBuilder.merge(response).build())
}
})
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
offsetDeleteRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED))
}
}
}

View File

@ -21,15 +21,19 @@ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallbac
import kafka.server.RequestLocal
import kafka.utils.MockTime
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.errors.InvalidGroupIdException
import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequestPartition, OffsetDeleteRequestTopic, OffsetDeleteRequestTopicCollection}
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
@ -346,6 +350,7 @@ class GroupCoordinatorAdapterTest {
.setProtocolType("qwerty")
).asJava)
assertTrue(future.isDone)
assertEquals(expectedData, future.get())
}
@ -752,4 +757,109 @@ class GroupCoordinatorAdapterTest {
assertTrue(future.isDone)
assertEquals(expectedData, future.get())
}
def testDeleteOffsets(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)
val bar0 = new TopicPartition("bar", 0)
val bar1 = new TopicPartition("bar", 1)
val ctx = makeContext(ApiKeys.OFFSET_DELETE, ApiKeys.OFFSET_DELETE.latestVersion)
val data = new OffsetDeleteRequestData()
.setGroupId("group")
.setTopics(new OffsetDeleteRequestTopicCollection(List(
new OffsetDeleteRequestTopic()
.setName("foo")
.setPartitions(List(
new OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestPartition().setPartitionIndex(1)
).asJava),
new OffsetDeleteRequestTopic()
.setName("bar")
.setPartitions(List(
new OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestPartition().setPartitionIndex(1)
).asJava)
).asJava.iterator))
val bufferSupplier = BufferSupplier.create()
when(groupCoordinator.handleDeleteOffsets(
data.groupId,
Seq(foo0, foo1, bar0, bar1),
RequestLocal(bufferSupplier)
)).thenReturn((
Errors.NONE,
Map(
foo0 -> Errors.NONE,
foo1 -> Errors.NONE,
bar0 -> Errors.GROUP_SUBSCRIBED_TO_TOPIC,
bar1 -> Errors.GROUP_SUBSCRIBED_TO_TOPIC,
)
))
val future = adapter.deleteOffsets(ctx, data, bufferSupplier)
val expectedData = new OffsetDeleteResponseData()
.setTopics(new OffsetDeleteResponseTopicCollection(List(
new OffsetDeleteResponseTopic()
.setName("foo")
.setPartitions(new OffsetDeleteResponsePartitionCollection(List(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava.iterator)),
new OffsetDeleteResponseTopic()
.setName("bar")
.setPartitions(new OffsetDeleteResponsePartitionCollection(List(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code),
new OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code)
).asJava.iterator)),
).asJava.iterator))
assertTrue(future.isDone)
assertEquals(expectedData, future.get())
}
@Test
def testDeleteOffsetsWithGroupLevelError(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)
val ctx = makeContext(ApiKeys.OFFSET_DELETE, ApiKeys.OFFSET_DELETE.latestVersion)
val data = new OffsetDeleteRequestData()
.setGroupId("group")
.setTopics(new OffsetDeleteRequestTopicCollection(List(
new OffsetDeleteRequestTopic()
.setName("foo")
.setPartitions(List(
new OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestPartition().setPartitionIndex(1)
).asJava)
).asJava.iterator))
val bufferSupplier = BufferSupplier.create()
when(groupCoordinator.handleDeleteOffsets(
data.groupId,
Seq(foo0, foo1),
RequestLocal(bufferSupplier)
)).thenReturn((Errors.INVALID_GROUP_ID, Map.empty[TopicPartition, Errors]))
val future = adapter.deleteOffsets(ctx, data, bufferSupplier)
assertTrue(future.isDone)
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[InvalidGroupIdException])
}
}

View File

@ -90,6 +90,7 @@ import scala.collection.{Map, Seq, mutable}
import scala.jdk.CollectionConverters._
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_2_IV0, IBP_2_2_IV1}
import org.apache.kafka.server.log.internals.{AppendOrigin, FetchParams, FetchPartitionData}
@ -2742,8 +2743,6 @@ class KafkaApisTest {
addTopicToMetadataCache("topic-1", numPartitions = 2)
addTopicToMetadataCache("topic-2", numPartitions = 2)
reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
val topics = new OffsetDeleteRequestTopicCollection()
topics.add(new OffsetDeleteRequestTopic()
.setName("topic-1")
@ -2764,37 +2763,180 @@ class KafkaApisTest {
val request = buildRequest(offsetDeleteRequest)
val requestLocal = RequestLocal.withThreadConfinedCaching
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
when(groupCoordinator.handleDeleteOffsets(
ArgumentMatchers.eq(group),
ArgumentMatchers.eq(Seq(
new TopicPartition("topic-1", 0),
new TopicPartition("topic-1", 1),
new TopicPartition("topic-2", 0),
new TopicPartition("topic-2", 1)
)),
ArgumentMatchers.eq(requestLocal)
)).thenReturn((Errors.NONE, Map(
new TopicPartition("topic-1", 0) -> Errors.NONE,
new TopicPartition("topic-1", 1) -> Errors.NONE,
new TopicPartition("topic-2", 0) -> Errors.NONE,
new TopicPartition("topic-2", 1) -> Errors.NONE,
)))
val future = new CompletableFuture[OffsetDeleteResponseData]()
when(newGroupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequest.data,
requestLocal.bufferSupplier
)).thenReturn(future)
createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
val offsetDeleteResponseData = new OffsetDeleteResponseData()
.setTopics(new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(List(
new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
.setName("topic-1")
.setPartitions(new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(List(
new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava.iterator)),
new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
.setName("topic-2")
.setPartitions(new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(List(
new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava.iterator))
).asJava.iterator()))
future.complete(offsetDeleteResponseData)
val response = verifyNoThrottling[OffsetDeleteResponse](request)
assertEquals(offsetDeleteResponseData, response.data)
}
def errorForPartition(topic: String, partition: Int): Errors = {
Errors.forCode(response.data.topics.find(topic).partitions.find(partition).errorCode)
}
@Test
def testOffsetDeleteTopicsAndPartitionsValidation(): Unit = {
val group = "groupId"
addTopicToMetadataCache("foo", numPartitions = 2)
addTopicToMetadataCache("bar", numPartitions = 2)
assertEquals(2, response.data.topics.size)
assertEquals(Errors.NONE, errorForPartition("topic-1", 0))
assertEquals(Errors.NONE, errorForPartition("topic-1", 1))
assertEquals(Errors.NONE, errorForPartition("topic-2", 0))
assertEquals(Errors.NONE, errorForPartition("topic-2", 1))
val offsetDeleteRequest = new OffsetDeleteRequestData()
.setGroupId(group)
.setTopics(new OffsetDeleteRequestTopicCollection(List(
// foo exists but has only 2 partitions.
new OffsetDeleteRequestTopic()
.setName("foo")
.setPartitions(List(
new OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestPartition().setPartitionIndex(1),
new OffsetDeleteRequestPartition().setPartitionIndex(2)
).asJava),
// bar exists.
new OffsetDeleteRequestTopic()
.setName("bar")
.setPartitions(List(
new OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestPartition().setPartitionIndex(1)
).asJava),
// zar does not exist.
new OffsetDeleteRequestTopic()
.setName("zar")
.setPartitions(List(
new OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestPartition().setPartitionIndex(1)
).asJava),
).asJava.iterator))
val requestChannelRequest = buildRequest(new OffsetDeleteRequest.Builder(offsetDeleteRequest).build())
// This is the request expected by the group coordinator. It contains
// only existing topic-partitions.
val expectedOffsetDeleteRequest = new OffsetDeleteRequestData()
.setGroupId(group)
.setTopics(new OffsetDeleteRequestTopicCollection(List(
new OffsetDeleteRequestTopic()
.setName("foo")
.setPartitions(List(
new OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestPartition().setPartitionIndex(1)
).asJava),
new OffsetDeleteRequestTopic()
.setName("bar")
.setPartitions(List(
new OffsetDeleteRequestPartition().setPartitionIndex(0),
new OffsetDeleteRequestPartition().setPartitionIndex(1)
).asJava)
).asJava.iterator))
val future = new CompletableFuture[OffsetDeleteResponseData]()
when(newGroupCoordinator.deleteOffsets(
requestChannelRequest.context,
expectedOffsetDeleteRequest,
RequestLocal.NoCaching.bufferSupplier
)).thenReturn(future)
createKafkaApis().handle(
requestChannelRequest,
RequestLocal.NoCaching
)
// This is the response returned by the group coordinator.
val offsetDeleteResponse = new OffsetDeleteResponseData()
.setTopics(new OffsetDeleteResponseTopicCollection(List(
new OffsetDeleteResponseTopic()
.setName("foo")
.setPartitions(new OffsetDeleteResponsePartitionCollection(List(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava.iterator)),
new OffsetDeleteResponseTopic()
.setName("bar")
.setPartitions(new OffsetDeleteResponsePartitionCollection(List(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava.iterator)),
).asJava.iterator))
val expectedOffsetDeleteResponse = new OffsetDeleteResponseData()
.setTopics(new OffsetDeleteResponseTopicCollection(List(
new OffsetDeleteResponseTopic()
.setName("foo")
.setPartitions(new OffsetDeleteResponsePartitionCollection(List(
// foo-2 is first because partitions failing the validation
// are put in the response first.
new OffsetDeleteResponsePartition()
.setPartitionIndex(2)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava.iterator)),
// zar is before bar because topics failing the validation are
// put in the response first.
new OffsetDeleteResponseTopic()
.setName("zar")
.setPartitions(new OffsetDeleteResponsePartitionCollection(List(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code),
new OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
).asJava.iterator)),
new OffsetDeleteResponseTopic()
.setName("bar")
.setPartitions(new OffsetDeleteResponsePartitionCollection(List(
new OffsetDeleteResponsePartition()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code),
new OffsetDeleteResponsePartition()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava.iterator)),
).asJava.iterator))
future.complete(offsetDeleteResponse)
val response = verifyNoThrottling[OffsetDeleteResponse](requestChannelRequest)
assertEquals(expectedOffsetDeleteResponse, response.data)
}
@Test
@ -2817,14 +2959,18 @@ class KafkaApisTest {
.setTopics(topics)
).build()
val request = buildRequest(offsetDeleteRequest)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
val requestLocal = RequestLocal.withThreadConfinedCaching
when(groupCoordinator.handleDeleteOffsets(ArgumentMatchers.eq(group), ArgumentMatchers.eq(Seq.empty),
ArgumentMatchers.eq(requestLocal))).thenReturn((Errors.NONE, Map.empty[TopicPartition, Errors]))
// The group coordinator is called even if there are no
// topic-partitions left after the validation.
when(newGroupCoordinator.deleteOffsets(
request.context,
new OffsetDeleteRequestData().setGroupId(group),
RequestLocal.NoCaching.bufferSupplier
)).thenReturn(CompletableFuture.completedFuture(
new OffsetDeleteResponseData()
))
createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
createKafkaApis().handleOffsetDeleteRequest(request, RequestLocal.NoCaching)
val response = verifyNoThrottling[OffsetDeleteResponse](request)
@ -2839,22 +2985,24 @@ class KafkaApisTest {
@Test
def testOffsetDeleteWithInvalidGroup(): Unit = {
val group = "groupId"
reset(groupCoordinator, replicaManager, clientRequestQuotaManager, requestChannel)
val topic = "topic"
addTopicToMetadataCache(topic, numPartitions = 1)
val offsetDeleteRequest = new OffsetDeleteRequest.Builder(
new OffsetDeleteRequestData()
.setGroupId(group)
new OffsetDeleteRequestData().setGroupId(group)
).build()
val request = buildRequest(offsetDeleteRequest)
val requestLocal = RequestLocal.withThreadConfinedCaching
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
when(groupCoordinator.handleDeleteOffsets(ArgumentMatchers.eq(group), ArgumentMatchers.eq(Seq.empty),
ArgumentMatchers.eq(requestLocal))).thenReturn((Errors.GROUP_ID_NOT_FOUND, Map.empty[TopicPartition, Errors]))
val future = new CompletableFuture[OffsetDeleteResponseData]()
when(newGroupCoordinator.deleteOffsets(
request.context,
offsetDeleteRequest.data,
RequestLocal.NoCaching.bufferSupplier
)).thenReturn(future)
createKafkaApis().handleOffsetDeleteRequest(request, requestLocal)
createKafkaApis().handleOffsetDeleteRequest(request, RequestLocal.NoCaching)
future.completeExceptionally(Errors.GROUP_ID_NOT_FOUND.exception)
val response = verifyNoThrottling[OffsetDeleteResponse](request)

View File

@ -28,6 +28,8 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
@ -198,5 +200,19 @@ public interface GroupCoordinator {
TxnOffsetCommitRequestData request,
BufferSupplier bufferSupplier
);
}
/**
* Delete offsets for a given Group.
*
* @param context The request context.
* @param request The OffsetDeleteRequest data.
* @param bufferSupplier The buffer supplier tight to the request thread.
*
* @return A future yielding the response or an exception.
*/
CompletableFuture<OffsetDeleteResponseData> deleteOffsets(
RequestContext context,
OffsetDeleteRequestData request,
BufferSupplier bufferSupplier
);
}