KAFKA-12267; Implement `DescribeTransactions` API (#10183)

This patch implements the `DescribeTransactions` API as documented in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions. This is only the server-side implementation and does not contain the `Admin` API.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Jason Gustafson 2021-02-24 12:50:18 -08:00 committed by GitHub
parent 059c9b3fcf
commit 3f09fb97b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 669 additions and 29 deletions

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class TransactionalIdNotFoundException extends ApiException {
public TransactionalIdNotFoundException(String message) {
super(message);
}
}

View File

@ -105,7 +105,8 @@ public enum ApiKeys {
DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false),
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false),
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true);
UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

View File

@ -110,6 +110,7 @@ import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.errors.UnacceptableCredentialException;
import org.apache.kafka.common.errors.UnknownLeaderEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@ -360,7 +361,8 @@ public enum Errors {
DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new),
BROKER_ID_NOT_REGISTERED(102, "The given broker ID was not registered.", BrokerIdNotRegisteredException::new),
INCONSISTENT_TOPIC_ID(103, "The log's topic ID did not match the topic ID in the request", InconsistentTopicIdException::new),
INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match that found on the server", InconsistentClusterIdException::new);
INCONSISTENT_CLUSTER_ID(104, "The clusterId in the request does not match that found on the server", InconsistentClusterIdException::new),
TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -284,6 +284,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return BrokerHeartbeatRequest.parse(buffer, apiVersion);
case UNREGISTER_BROKER:
return UnregisterBrokerRequest.parse(buffer, apiVersion);
case DESCRIBE_TRANSACTIONS:
return DescribeTransactionsRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -241,6 +241,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return BrokerHeartbeatResponse.parse(responseBuffer, version);
case UNREGISTER_BROKER:
return UnregisterBrokerResponse.parse(responseBuffer, version);
case DESCRIBE_TRANSACTIONS:
return DescribeTransactionsResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeTransactionsRequestData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
public class DescribeTransactionsRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DescribeTransactionsRequest> {
public final DescribeTransactionsRequestData data;
public Builder(DescribeTransactionsRequestData data) {
super(ApiKeys.DESCRIBE_TRANSACTIONS);
this.data = data;
}
@Override
public DescribeTransactionsRequest build(short version) {
return new DescribeTransactionsRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final DescribeTransactionsRequestData data;
private DescribeTransactionsRequest(DescribeTransactionsRequestData data, short version) {
super(ApiKeys.DESCRIBE_TRANSACTIONS, version);
this.data = data;
}
@Override
public DescribeTransactionsRequestData data() {
return data;
}
@Override
public DescribeTransactionsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
DescribeTransactionsResponseData response = new DescribeTransactionsResponseData()
.setThrottleTimeMs(throttleTimeMs);
for (String transactionalId : data.transactionalIds()) {
DescribeTransactionsResponseData.TransactionState transactionState =
new DescribeTransactionsResponseData.TransactionState()
.setTransactionalId(transactionalId)
.setErrorCode(error.code());
response.transactionStates().add(transactionState);
}
return new DescribeTransactionsResponse(response);
}
public static DescribeTransactionsRequest parse(ByteBuffer buffer, short version) {
return new DescribeTransactionsRequest(new DescribeTransactionsRequestData(
new ByteBufferAccessor(buffer), version), version);
}
@Override
public String toString(boolean verbose) {
return data.toString();
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData.TransactionState;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class DescribeTransactionsResponse extends AbstractResponse {
private final DescribeTransactionsResponseData data;
public DescribeTransactionsResponse(DescribeTransactionsResponseData data) {
super(ApiKeys.DESCRIBE_TRANSACTIONS);
this.data = data;
}
@Override
public DescribeTransactionsResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> errorCounts = new HashMap<>();
for (TransactionState transactionState : data.transactionStates()) {
Errors error = Errors.forCode(transactionState.errorCode());
updateErrorCounts(errorCounts, error);
}
return errorCounts;
}
public static DescribeTransactionsResponse parse(ByteBuffer buffer, short version) {
return new DescribeTransactionsResponse(new DescribeTransactionsResponseData(
new ByteBufferAccessor(buffer), version));
}
@Override
public String toString() {
return data.toString();
}
@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}
}

View File

@ -35,7 +35,7 @@
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The partition error message, which may be null if no additional details are available" },
{ "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [
{ "name": "ProducerId", "type": "int64", "versions": "0+" },
{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
{ "name": "ProducerEpoch", "type": "int32", "versions": "0+" },
{ "name": "LastSequence", "type": "int32", "versions": "0+", "default": "-1" },
{ "name": "LastTimestamp", "type": "int64", "versions": "0+", "default": "-1" },

View File

@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 65,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "DescribeTransactionsRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "TransactionalIds", "entityType": "transactionalId", "type": "[]string", "versions": "0+",
"about": "Array of transactionalIds to include in describe results. If empty, then no results will be returned." }
]
}

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 65,
"type": "response",
"name": "DescribeTransactionsResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+" },
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId" },
{ "name": "TransactionState", "type": "string", "versions": "0+" },
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+" },
{ "name": "TransactionStartTimeMs", "type": "int64", "versions": "0+" },
{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+" },
{ "name": "Topics", "type": "[]TopicData", "versions": "0+",
"about": "The set of partitions included in the current transaction (if active). When a transaction is preparing to commit or abort, this will include only partitions which do not have markers.",
"fields": [
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true },
{ "name": "Partitions", "type": "[]int32", "versions": "0+" }
]
}
]}
]
}

View File

@ -101,6 +101,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
import org.apache.kafka.common.message.DescribeProducersRequestData;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.DescribeTransactionsRequestData;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.EndTxnRequestData;
@ -549,6 +551,15 @@ public class RequestResponseTest {
}
}
@Test
public void testDescribeTransactionsSerialization() {
for (short v : ApiKeys.DESCRIBE_TRANSACTIONS.allVersions()) {
checkRequest(createDescribeTransactionsRequest(v), true);
checkErrorResponse(createDescribeTransactionsRequest(v), unknownServerException, true);
checkResponse(createDescribeTransactionsResponse(), v, true);
}
}
@Test
public void testDescribeClusterSerialization() {
for (short v : ApiKeys.DESCRIBE_CLUSTER.allVersions()) {
@ -2754,4 +2765,45 @@ public class RequestResponseTest {
assertEquals(Integer.valueOf(1), createUpdateMetadataResponse().errorCounts().get(Errors.NONE));
assertEquals(Integer.valueOf(1), createWriteTxnMarkersResponse().errorCounts().get(Errors.NONE));
}
private DescribeTransactionsRequest createDescribeTransactionsRequest(short version) {
DescribeTransactionsRequestData data = new DescribeTransactionsRequestData()
.setTransactionalIds(asList("t1", "t2", "t3"));
return new DescribeTransactionsRequest.Builder(data).build(version);
}
private DescribeTransactionsResponse createDescribeTransactionsResponse() {
DescribeTransactionsResponseData data = new DescribeTransactionsResponseData();
data.setTransactionStates(asList(
new DescribeTransactionsResponseData.TransactionState()
.setErrorCode(Errors.NONE.code())
.setTransactionalId("t1")
.setProducerId(12345L)
.setProducerEpoch((short) 15)
.setTransactionStartTimeMs(13490218304L)
.setTransactionState("Empty"),
new DescribeTransactionsResponseData.TransactionState()
.setErrorCode(Errors.NONE.code())
.setTransactionalId("t2")
.setProducerId(98765L)
.setProducerEpoch((short) 30)
.setTransactionStartTimeMs(13490218304L)
.setTransactionState("Ongoing")
.setTopics(new DescribeTransactionsResponseData.TopicDataCollection(
asList(
new DescribeTransactionsResponseData.TopicData()
.setTopic("foo")
.setPartitions(asList(1, 3, 5, 7)),
new DescribeTransactionsResponseData.TopicData()
.setTopic("bar")
.setPartitions(asList(1, 3))
).iterator()
)),
new DescribeTransactionsResponseData.TransactionState()
.setErrorCode(Errors.NOT_COORDINATOR.code())
.setTransactionalId("t3")
));
return new DescribeTransactionsResponse(data);
}
}

View File

@ -23,6 +23,7 @@ import kafka.server.{KafkaConfig, MetadataCache, ReplicaManager}
import kafka.utils.{Logging, Scheduler}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DescribeTransactionsResponseData
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
@ -255,6 +256,51 @@ class TransactionCoordinator(brokerId: Int,
}
}
def handleDescribeTransactions(
transactionalId: String
): DescribeTransactionsResponseData.TransactionState = {
if (transactionalId == null) {
throw new IllegalArgumentException("Invalid null transactionalId")
}
val transactionState = new DescribeTransactionsResponseData.TransactionState()
.setTransactionalId(transactionalId)
if (!isActive.get()) {
transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
} else if (transactionalId.isEmpty) {
transactionState.setErrorCode(Errors.INVALID_REQUEST.code)
} else {
txnManager.getTransactionState(transactionalId) match {
case Left(error) =>
transactionState.setErrorCode(error.code)
case Right(None) =>
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code)
case Right(Some(coordinatorEpochAndMetadata)) =>
val txnMetadata = coordinatorEpochAndMetadata.transactionMetadata
txnMetadata.inLock {
txnMetadata.topicPartitions.foreach { topicPartition =>
var topicData = transactionState.topics.find(topicPartition.topic)
if (topicData == null) {
topicData = new DescribeTransactionsResponseData.TopicData()
.setTopic(topicPartition.topic)
transactionState.topics.add(topicData)
}
topicData.partitions.add(topicPartition.partition)
}
transactionState
.setErrorCode(Errors.NONE.code)
.setProducerId(txnMetadata.producerId)
.setProducerEpoch(txnMetadata.producerEpoch)
.setTransactionState(txnMetadata.state.name)
.setTransactionTimeoutMs(txnMetadata.txnTimeoutMs)
.setTransactionStartTimeMs(txnMetadata.txnStartTimestamp)
}
}
}
}
def handleAddPartitionsToTransaction(transactionalId: String,
producerId: Long,
producerEpoch: Short,

View File

@ -25,7 +25,14 @@ import org.apache.kafka.common.record.RecordBatch
import scala.collection.{immutable, mutable}
private[transaction] sealed trait TransactionState { def byte: Byte }
private[transaction] sealed trait TransactionState {
def byte: Byte
/**
* Get the name of this state. This is exposed through the `DescribeTransactions` API.
*/
def name: String
}
/**
* Transaction has not existed yet
@ -33,7 +40,10 @@ private[transaction] sealed trait TransactionState { def byte: Byte }
* transition: received AddPartitionsToTxnRequest => Ongoing
* received AddOffsetsToTxnRequest => Ongoing
*/
private[transaction] case object Empty extends TransactionState { val byte: Byte = 0 }
private[transaction] case object Empty extends TransactionState {
val byte: Byte = 0
val name: String = "Empty"
}
/**
* Transaction has started and ongoing
@ -43,46 +53,67 @@ private[transaction] case object Empty extends TransactionState { val byte: Byte
* received AddPartitionsToTxnRequest => Ongoing
* received AddOffsetsToTxnRequest => Ongoing
*/
private[transaction] case object Ongoing extends TransactionState { val byte: Byte = 1 }
private[transaction] case object Ongoing extends TransactionState {
val byte: Byte = 1
val name: String = "Ongoing"
}
/**
* Group is preparing to commit
*
* transition: received acks from all partitions => CompleteCommit
*/
private[transaction] case object PrepareCommit extends TransactionState { val byte: Byte = 2}
private[transaction] case object PrepareCommit extends TransactionState {
val byte: Byte = 2
val name: String = "PrepareCommit"
}
/**
* Group is preparing to abort
*
* transition: received acks from all partitions => CompleteAbort
*/
private[transaction] case object PrepareAbort extends TransactionState { val byte: Byte = 3 }
private[transaction] case object PrepareAbort extends TransactionState {
val byte: Byte = 3
val name: String = "PrepareAbort"
}
/**
* Group has completed commit
*
* Will soon be removed from the ongoing transaction cache
*/
private[transaction] case object CompleteCommit extends TransactionState { val byte: Byte = 4 }
private[transaction] case object CompleteCommit extends TransactionState {
val byte: Byte = 4
val name: String = "CompleteCommit"
}
/**
* Group has completed abort
*
* Will soon be removed from the ongoing transaction cache
*/
private[transaction] case object CompleteAbort extends TransactionState { val byte: Byte = 5 }
private[transaction] case object CompleteAbort extends TransactionState {
val byte: Byte = 5
val name: String = "CompleteAbort"
}
/**
* TransactionalId has expired and is about to be removed from the transaction cache
*/
private[transaction] case object Dead extends TransactionState { val byte: Byte = 6 }
private[transaction] case object Dead extends TransactionState {
val byte: Byte = 6
val name: String = "Dead"
}
/**
* We are in the middle of bumping the epoch and fencing out older producers.
*/
private[transaction] case object PrepareEpochFence extends TransactionState { val byte: Byte = 7}
private[transaction] case object PrepareEpochFence extends TransactionState {
val byte: Byte = 7
val name: String = "PrepareEpochFence"
}
private[transaction] object TransactionMetadata {
def apply(transactionalId: String, producerId: Long, producerEpoch: Short, txnTimeoutMs: Int, timestamp: Long) =

View File

@ -92,6 +92,7 @@ object RequestConvertToJson {
case req: FetchSnapshotRequest => FetchSnapshotRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeClusterRequest => DescribeClusterRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " +
"code should be updated to do so.");
}
@ -164,6 +165,7 @@ object RequestConvertToJson {
case res: FetchSnapshotResponse => FetchSnapshotResponseDataJsonConverter.write(res.data, version)
case res: DescribeClusterResponse => DescribeClusterResponseDataJsonConverter.write(res.data, version)
case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version)
case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
"code should be updated to do so.");
}

View File

@ -61,7 +61,7 @@ import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsParti
import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, DescribeProducersResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseData, DescribeConfigsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, DescribeProducersResponseData, DescribeTransactionsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData, ListOffsetsResponseData, ListPartitionReassignmentsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteResponseData, OffsetForLeaderEpochResponseData, RenewDelegationTokenResponseData, SaslAuthenticateResponseData, SaslHandshakeResponseData, StopReplicaResponseData, SyncGroupResponseData, UpdateMetadataResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@ -222,6 +222,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
@ -3274,6 +3275,34 @@ class KafkaApis(val requestChannel: RequestChannel,
"Apache ZooKeeper mode.")
}
def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit = {
val describeTransactionsRequest = request.body[DescribeTransactionsRequest]
val response = new DescribeTransactionsResponseData()
describeTransactionsRequest.data.transactionalIds.forEach { transactionalId =>
val transactionState = if (!authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, transactionalId)) {
new DescribeTransactionsResponseData.TransactionState()
.setTransactionalId(transactionalId)
.setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code)
} else {
txnCoordinator.handleDescribeTransactions(transactionalId)
}
// Include only partitions which the principal is authorized to describe
val topicIter = transactionState.topics.iterator()
while (topicIter.hasNext) {
val topic = topicIter.next().topic
if (!authHelper.authorize(request.context, DESCRIBE, TOPIC, topic)) {
topicIter.remove()
}
}
response.transactionStates.add(transactionState)
}
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
}
private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordConversionStats): Unit = {

View File

@ -36,7 +36,6 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
@ -44,11 +43,10 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopicCollection
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic, OffsetForLeaderTopicCollection}
import org.apache.kafka.common.message.StopReplicaRequestData.{StopReplicaPartitionState, StopReplicaTopicState}
import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AlterPartitionReassignmentsRequestData, AlterReplicaLogDirsRequestData, ControlledShutdownRequestData, CreateAclsRequestData, CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData, DeleteGroupsRequestData, DeleteRecordsRequestData, DeleteTopicsRequestData, DescribeClusterRequestData, DescribeConfigsRequestData, DescribeGroupsRequestData, DescribeLogDirsRequestData, DescribeProducersRequestData, DescribeTransactionsRequestData, FindCoordinatorRequestData, HeartbeatRequestData, IncrementalAlterConfigsRequestData, JoinGroupRequestData, ListPartitionReassignmentsRequestData, MetadataRequestData, OffsetCommitRequestData, ProduceRequestData, SyncGroupRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, Records, SimpleRecord}
@ -64,9 +62,9 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable
import scala.collection.mutable.Buffer
import scala.jdk.CollectionConverters._
object AuthorizerIntegrationTest {
val BrokerPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "broker")
@ -237,6 +235,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.partitions.asScala.find(_.partitionIndex == part).get
.errorCode
)
}),
ApiKeys.DESCRIBE_TRANSACTIONS -> ((resp: DescribeTransactionsResponse) => {
Errors.forCode(
resp.data
.transactionStates.asScala.find(_.transactionalId == transactionalId).get
.errorCode
)
})
)
@ -285,7 +290,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> clusterAlterAcl,
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> clusterDescribeAcl,
ApiKeys.OFFSET_DELETE -> groupReadAcl,
ApiKeys.DESCRIBE_PRODUCERS -> topicReadAcl
ApiKeys.DESCRIBE_PRODUCERS -> topicReadAcl,
ApiKeys.DESCRIBE_TRANSACTIONS -> transactionalIdDescribeAcl
)
@BeforeEach
@ -636,6 +642,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
).asJava)
).build()
private def describeTransactionsRequest: DescribeTransactionsRequest = new DescribeTransactionsRequest.Builder(
new DescribeTransactionsRequestData().setTransactionalIds(List(transactionalId).asJava)
).build()
private def alterPartitionReassignmentsRequest = new AlterPartitionReassignmentsRequest.Builder(
new AlterPartitionReassignmentsRequestData().setTopics(
List(new AlterPartitionReassignmentsRequestData.ReassignableTopic()
@ -712,6 +722,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> alterPartitionReassignmentsRequest,
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> listPartitionReassignmentsRequest,
ApiKeys.DESCRIBE_PRODUCERS -> describeProducersRequest,
ApiKeys.DESCRIBE_TRANSACTIONS -> describeTransactionsRequest,
// Inter-broker APIs use an invalid broker epoch, so does not affect the test case
ApiKeys.UPDATE_METADATA -> createUpdateMetadataRequest,
@ -1790,6 +1801,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
assertThrows(classOf[TransactionalIdAuthorizationException], () => producer.commitTransaction())
}
@Test
def shouldNotIncludeUnauthorizedTopicsInDescribeTransactionsResponse(): Unit = {
createTopic(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), transactionalIdResource)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
// Start a transaction and write to a topic.
val producer = buildTransactionalProducer()
producer.initTransactions()
producer.beginTransaction()
producer.send(new ProducerRecord(tp.topic, tp.partition, "1".getBytes, "1".getBytes)).get
// Remove only topic authorization so that we can verify that the
// topic does not get included in the response.
removeAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, WRITE, ALLOW)), topicResource)
val response = connectAndReceive[DescribeTransactionsResponse](describeTransactionsRequest)
assertEquals(1, response.data.transactionStates.size)
val transactionStateData = response.data.transactionStates.asScala.find(_.transactionalId == transactionalId).get
assertEquals("Ongoing", transactionStateData.transactionState)
assertEquals(List.empty, transactionStateData.topics.asScala.toList)
}
@Test
def shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException(): Unit = {
createTopic(topic)
@ -2114,6 +2147,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
TestUtils.addAndVerifyAcls(servers.head, acls, resource)
}
private def removeAndVerifyAcls(acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
TestUtils.removeAndVerifyAcls(servers.head, acls, resource)
}
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
numRecords: Int = 1,
startingOffset: Int = 0,

View File

@ -27,6 +27,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import scala.collection.mutable
import scala.jdk.CollectionConverters._
class TransactionCoordinatorTest {
@ -1078,6 +1079,56 @@ class TransactionCoordinatorTest {
EasyMock.verify(transactionManager)
}
@Test
def testDescribeTransactionsWithEmptyTransactionalId(): Unit = {
coordinator.startup(() => transactionStatePartitionCount, enableTransactionalIdExpiration = false)
val result = coordinator.handleDescribeTransactions("")
assertEquals("", result.transactionalId)
assertEquals(Errors.INVALID_REQUEST, Errors.forCode(result.errorCode))
}
@Test
def testDescribeTransactionsWhileCoordinatorLoading(): Unit = {
EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
.andReturn(Left(Errors.COORDINATOR_LOAD_IN_PROGRESS))
EasyMock.replay(transactionManager)
coordinator.startup(() => transactionStatePartitionCount, enableTransactionalIdExpiration = false)
val result = coordinator.handleDescribeTransactions(transactionalId)
assertEquals(transactionalId, result.transactionalId)
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, Errors.forCode(result.errorCode))
EasyMock.verify(transactionManager)
}
@Test
def testDescribeTransactions(): Unit = {
val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds())
EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
.andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata))))
EasyMock.replay(transactionManager)
coordinator.startup(() => transactionStatePartitionCount, enableTransactionalIdExpiration = false)
val result = coordinator.handleDescribeTransactions(transactionalId)
assertEquals(Errors.NONE, Errors.forCode(result.errorCode))
assertEquals(transactionalId, result.transactionalId)
assertEquals(producerId, result.producerId)
assertEquals(producerEpoch, result.producerEpoch)
assertEquals(txnTimeoutMs, result.transactionTimeoutMs)
assertEquals(time.milliseconds(), result.transactionStartTimeMs)
val addedPartitions = result.topics.asScala.flatMap { topicData =>
topicData.partitions.asScala.map(partition => new TopicPartition(topicData.topic, partition))
}.toSet
assertEquals(partitions, addedPartitions)
EasyMock.verify(transactionManager)
}
private def validateRespondsWithConcurrentTransactionsOnInitPidWhenInPrepareState(state: TransactionState): Unit = {
EasyMock.expect(transactionManager.validateTransactionTimeoutMs(EasyMock.anyInt()))
.andReturn(true).anyTimes()

View File

@ -3355,7 +3355,128 @@ class KafkaApisTest {
val bazTopic = response.data.topics.asScala.find(_.name == tp3.topic).get
val bazPartition = bazTopic.partitions.asScala.find(_.partitionIndex == tp3.partition).get
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.forCode(bazPartition.errorCode))
}
@Test
def testDescribeTransactions(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val data = new DescribeTransactionsRequestData()
.setTransactionalIds(List("foo", "bar").asJava)
val describeTransactionsRequest = new DescribeTransactionsRequest.Builder(data).build()
val request = buildRequest(describeTransactionsRequest)
val capturedResponse = expectNoThrottling(request)
def buildExpectedActions(transactionalId: String): util.List[Action] = {
val pattern = new ResourcePattern(ResourceType.TRANSACTIONAL_ID, transactionalId, PatternType.LITERAL)
val action = new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
Collections.singletonList(action)
}
EasyMock.expect(txnCoordinator.handleDescribeTransactions("foo"))
.andReturn(new DescribeTransactionsResponseData.TransactionState()
.setErrorCode(Errors.NONE.code)
.setTransactionalId("foo")
.setProducerId(12345L)
.setProducerEpoch(15)
.setTransactionStartTimeMs(time.milliseconds())
.setTransactionState("CompleteCommit")
.setTransactionTimeoutMs(10000))
EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("foo"))))
.andReturn(Seq(AuthorizationResult.ALLOWED).asJava)
.once()
EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(buildExpectedActions("bar"))))
.andReturn(Seq(AuthorizationResult.DENIED).asJava)
.once()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDescribeTransactionsRequest(request)
val response = capturedResponse.getValue.asInstanceOf[DescribeTransactionsResponse]
assertEquals(2, response.data.transactionStates.size)
val fooState = response.data.transactionStates.asScala.find(_.transactionalId == "foo").get
assertEquals(Errors.NONE.code, fooState.errorCode)
assertEquals(12345L, fooState.producerId)
assertEquals(15, fooState.producerEpoch)
assertEquals(time.milliseconds(), fooState.transactionStartTimeMs)
assertEquals("CompleteCommit", fooState.transactionState)
assertEquals(10000, fooState.transactionTimeoutMs)
assertEquals(List.empty, fooState.topics.asScala.toList)
val barState = response.data.transactionStates.asScala.find(_.transactionalId == "bar").get
assertEquals(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code, barState.errorCode)
}
@Test
def testDescribeTransactionsFiltersUnauthorizedTopics(): Unit = {
val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
val transactionalId = "foo"
val data = new DescribeTransactionsRequestData()
.setTransactionalIds(List(transactionalId).asJava)
val describeTransactionsRequest = new DescribeTransactionsRequest.Builder(data).build()
val request = buildRequest(describeTransactionsRequest)
val capturedResponse = expectNoThrottling(request)
def expectDescribe(
resourceType: ResourceType,
transactionalId: String,
result: AuthorizationResult
): Unit = {
val pattern = new ResourcePattern(resourceType, transactionalId, PatternType.LITERAL)
val action = new Action(AclOperation.DESCRIBE, pattern, 1, true, true)
val actions = Collections.singletonList(action)
EasyMock.expect(authorizer.authorize(anyObject[RequestContext], EasyMock.eq(actions)))
.andReturn(Seq(result).asJava)
.once()
}
// Principal is authorized to one of the two topics. The second topic should be
// filtered from the result.
expectDescribe(ResourceType.TRANSACTIONAL_ID, transactionalId, AuthorizationResult.ALLOWED)
expectDescribe(ResourceType.TOPIC, "foo", AuthorizationResult.ALLOWED)
expectDescribe(ResourceType.TOPIC, "bar", AuthorizationResult.DENIED)
def mkTopicData(
topic: String,
partitions: Seq[Int]
): DescribeTransactionsResponseData.TopicData = {
new DescribeTransactionsResponseData.TopicData()
.setTopic(topic)
.setPartitions(partitions.map(Int.box).asJava)
}
val describeTransactionsResponse = new DescribeTransactionsResponseData.TransactionState()
.setErrorCode(Errors.NONE.code)
.setTransactionalId(transactionalId)
.setProducerId(12345L)
.setProducerEpoch(15)
.setTransactionStartTimeMs(time.milliseconds())
.setTransactionState("Ongoing")
.setTransactionTimeoutMs(10000)
describeTransactionsResponse.topics.add(mkTopicData(topic = "foo", Seq(1, 2)))
describeTransactionsResponse.topics.add(mkTopicData(topic = "bar", Seq(3, 4)))
EasyMock.expect(txnCoordinator.handleDescribeTransactions("foo"))
.andReturn(describeTransactionsResponse)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, authorizer)
createKafkaApis(authorizer = Some(authorizer)).handleDescribeTransactionsRequest(request)
val response = capturedResponse.getValue.asInstanceOf[DescribeTransactionsResponse]
assertEquals(1, response.data.transactionStates.size)
val fooState = response.data.transactionStates.asScala.find(_.transactionalId == "foo").get
assertEquals(Errors.NONE.code, fooState.errorCode)
assertEquals(12345L, fooState.producerId)
assertEquals(15, fooState.producerEpoch)
assertEquals(time.milliseconds(), fooState.transactionStartTimeMs)
assertEquals("Ongoing", fooState.transactionState)
assertEquals(10000, fooState.transactionTimeoutMs)
assertEquals(List(mkTopicData(topic = "foo", Seq(1, 2))), fooState.topics.asScala.toList)
}
private def createMockRequest(): RequestChannel.Request = {

View File

@ -635,6 +635,10 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.UNREGISTER_BROKER =>
new UnregisterBrokerRequest.Builder(new UnregisterBrokerRequestData())
case ApiKeys.DESCRIBE_TRANSACTIONS =>
new DescribeTransactionsRequest.Builder(new DescribeTransactionsRequestData()
.setTransactionalIds(List("test-transactional-id").asJava))
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}

View File

@ -24,18 +24,19 @@ import java.nio.file.{Files, StandardOpenOption}
import java.security.cert.X509Certificate
import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.{Arrays, Collections, Properties}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import java.util.{Arrays, Collections, Properties}
import com.yammer.metrics.core.Meter
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.log._
import kafka.metrics.KafkaYammerMetrics
import kafka.security.auth.{Acl, Resource, Authorizer => LegacyAuthorizer}
import kafka.server._
import kafka.server.checkpoints.OffsetCheckpointFile
import com.yammer.metrics.core.Meter
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.metrics.KafkaYammerMetrics
import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, MetadataBroker}
import kafka.utils.Implicits._
import kafka.zk._
@ -57,8 +58,8 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, IntegerSerializer, Serializer}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{KafkaFuture, Node, TopicPartition}
import org.apache.kafka.server.authorizer.{Authorizer => JAuthorizer}
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
@ -67,11 +68,11 @@ import org.apache.zookeeper.ZooDefs._
import org.apache.zookeeper.data.ACL
import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Map, Seq, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._
/**
* Utility functions to help with testing
@ -1853,4 +1854,18 @@ object TestUtils extends Logging {
authorizer, resource)
}
def removeAndVerifyAcls(server: KafkaServer, acls: Set[AccessControlEntry], resource: ResourcePattern): Unit = {
val authorizer = server.dataPlaneRequestProcessor.authorizer.get
val aclBindingFilters = acls.map { acl => new AclBindingFilter(resource.toFilter, acl.toFilter) }
authorizer.deleteAcls(null, aclBindingFilters.toList.asJava).asScala
.map(_.toCompletableFuture.get)
.foreach { result =>
result.exception.ifPresent { e => throw e }
}
val aclFilter = new AclBindingFilter(resource.toFilter, AccessControlEntryFilter.ANY)
waitAndVerifyAcls(
authorizer.acls(aclFilter).asScala.map(_.entry).toSet -- acls,
authorizer, resource)
}
}