KAFKA-16314: Introducing the AbortableTransactionException (#15486)

As a part of KIP-890, we are introducing a new class of Exceptions which when encountered shall lead to Aborting the ongoing Transaction. The following PR introduces the same with client side handling and server side changes.

On client Side, the code attempts to handle the exception as an Abortable error and ensure that it doesn't take the producer to a fatal state. For each of the Transactional APIs, we have added the appropriate handling. For the produce request, we have verified that the exception transitions the state to Aborted.
On the server side, we have bumped the ProduceRequest, ProduceResponse, TxnOffestCommitRequest and TxnOffsetCommitResponse Version. The appropriate handling on the server side has been added to ensure that the new error case is sent back only for the new clients. The older clients will continue to get the old Invalid_txn_state exception to maintain backward compatibility.

Reviewers: Calvin Liu <caliu@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Sanskar Jhajharia 2024-03-22 23:56:07 +05:30 committed by GitHub
parent 159d25a7df
commit 2e8d69b78c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 445 additions and 110 deletions

View File

@ -1330,6 +1330,8 @@ public class TransactionManager {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator, // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED. // just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception()); fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.ABORTABLE_TRANSACTION) {
abortableError(error.exception());
} else { } else {
fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message())); fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
} }
@ -1386,10 +1388,8 @@ public class TransactionManager {
// just treat it the same as PRODUCE_FENCED. // just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception()); fatalError(Errors.PRODUCER_FENCED.exception());
return; return;
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
fatalError(error.exception()); error == Errors.INVALID_TXN_STATE) {
return;
} else if (error == Errors.INVALID_TXN_STATE) {
fatalError(error.exception()); fatalError(error.exception());
return; return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
@ -1401,6 +1401,9 @@ public class TransactionManager {
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) { } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
abortableErrorIfPossible(error.exception()); abortableErrorIfPossible(error.exception());
return; return;
} else if (error == Errors.ABORTABLE_TRANSACTION) {
abortableError(error.exception());
return;
} else { } else {
log.error("Could not add partition {} due to unexpected error {}", topicPartition, error); log.error("Could not add partition {} due to unexpected error {}", topicPartition, error);
hasPartitionErrors = true; hasPartitionErrors = true;
@ -1504,6 +1507,8 @@ public class TransactionManager {
fatalError(error.exception()); fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(key)); abortableError(GroupAuthorizationException.forGroupId(key));
} else if (error == Errors.ABORTABLE_TRANSACTION) {
abortableError(error.exception());
} else { } else {
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to " + fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to " +
"unexpected error: %s", coordinatorType, key, "unexpected error: %s", coordinatorType, key,
@ -1552,12 +1557,13 @@ public class TransactionManager {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator, // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED. // just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception()); fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
fatalError(error.exception()); error == Errors.INVALID_TXN_STATE) {
} else if (error == Errors.INVALID_TXN_STATE) {
fatalError(error.exception()); fatalError(error.exception());
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) { } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
abortableErrorIfPossible(error.exception()); abortableErrorIfPossible(error.exception());
} else if (error == Errors.ABORTABLE_TRANSACTION) {
abortableError(error.exception());
} else { } else {
fatalError(new KafkaException("Unhandled error in EndTxnResponse: " + error.message())); fatalError(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
} }
@ -1611,12 +1617,13 @@ public class TransactionManager {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator, // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED. // just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception()); fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
fatalError(error.exception()); error == Errors.INVALID_TXN_STATE) {
} else if (error == Errors.INVALID_TXN_STATE) {
fatalError(error.exception()); fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId())); abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
} else if (error == Errors.ABORTABLE_TRANSACTION) {
abortableError(error.exception());
} else { } else {
fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message())); fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
} }
@ -1679,7 +1686,8 @@ public class TransactionManager {
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId())); abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
break; break;
} else if (error == Errors.FENCED_INSTANCE_ID) { } else if (error == Errors.FENCED_INSTANCE_ID ||
error == Errors.ABORTABLE_TRANSACTION) {
abortableError(error.exception()); abortableError(error.exception());
break; break;
} else if (error == Errors.UNKNOWN_MEMBER_ID } else if (error == Errors.UNKNOWN_MEMBER_ID

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
public class AbortableTransactionException extends ApiException {
public AbortableTransactionException(String message) {
super(message);
}
}

View File

@ -138,6 +138,7 @@ import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.AbortableTransactionException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -392,7 +393,8 @@ public enum Errors {
UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new),
UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new), UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new),
TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new), TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new),
INVALID_REGISTRATION(119, "The controller has considered the broker registration to be invalid.", InvalidRegistrationException::new); INVALID_REGISTRATION(119, "The controller has considered the broker registration to be invalid.", InvalidRegistrationException::new),
ABORTABLE_TRANSACTION(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", AbortableTransactionException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class); private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -23,7 +23,9 @@
// Version 2 adds the support for new error code PRODUCER_FENCED. // Version 2 adds the support for new error code PRODUCER_FENCED.
// //
// Version 3 enables flexible versions. // Version 3 enables flexible versions.
"validVersions": "0-3", //
// Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

View File

@ -22,7 +22,9 @@
// Version 2 adds the support for new error code PRODUCER_FENCED. // Version 2 adds the support for new error code PRODUCER_FENCED.
// //
// Version 3 enables flexible versions. // Version 3 enables flexible versions.
"validVersions": "0-3", //
// Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -25,9 +25,11 @@
// Version 3 enables flexible versions. // Version 3 enables flexible versions.
// //
// Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions. // Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions.
//
// Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
// Versions 3 and below will be exclusively used by clients and versions 4 and above will be used by brokers. // Versions 3 and below will be exclusively used by clients and versions 4 and above will be used by brokers.
"latestVersionUnstable": false, "latestVersionUnstable": false,
"validVersions": "0-4", "validVersions": "0-5",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "Transactions", "type": "[]AddPartitionsToTxnTransaction", "versions": "4+", { "name": "Transactions", "type": "[]AddPartitionsToTxnTransaction", "versions": "4+",

View File

@ -24,7 +24,9 @@
// Version 3 enables flexible versions. // Version 3 enables flexible versions.
// //
// Version 4 adds support to batch multiple transactions and a top level error code. // Version 4 adds support to batch multiple transactions and a top level error code.
"validVersions": "0-4", //
// Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-5",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -23,7 +23,9 @@
// Version 2 adds the support for new error code PRODUCER_FENCED. // Version 2 adds the support for new error code PRODUCER_FENCED.
// //
// Version 3 enables flexible versions. // Version 3 enables flexible versions.
"validVersions": "0-3", //
// Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

View File

@ -22,7 +22,9 @@
// Version 2 adds the support for new error code PRODUCER_FENCED. // Version 2 adds the support for new error code PRODUCER_FENCED.
// //
// Version 3 enables flexible versions. // Version 3 enables flexible versions.
"validVersions": "0-3", //
// Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -25,7 +25,9 @@
// Version 3 is the first flexible version. // Version 3 is the first flexible version.
// //
// Version 4 adds support for batching via CoordinatorKeys (KIP-699) // Version 4 adds support for batching via CoordinatorKeys (KIP-699)
"validVersions": "0-4", //
// Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-5",
"deprecatedVersions": "0", "deprecatedVersions": "0",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [

View File

@ -24,7 +24,9 @@
// Version 3 is the first flexible version. // Version 3 is the first flexible version.
// //
// Version 4 adds support for batching via Coordinators (KIP-699) // Version 4 adds support for batching via Coordinators (KIP-699)
"validVersions": "0-4", //
// Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-5",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,

View File

@ -25,7 +25,9 @@
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error // Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
// //
// Version 4 adds the support for new error code PRODUCER_FENCED. // Version 4 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-4", //
// Verison 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-5",
"flexibleVersions": "2+", "flexibleVersions": "2+",
"fields": [ "fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId", { "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId",

View File

@ -24,7 +24,9 @@
// Version 3 is the same as version 2. // Version 3 is the same as version 2.
// //
// Version 4 adds the support for new error code PRODUCER_FENCED. // Version 4 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-4", //
// Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-5",
"flexibleVersions": "2+", "flexibleVersions": "2+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,

View File

@ -35,7 +35,9 @@
// Version 9 enables flexible versions. // Version 9 enables flexible versions.
// //
// Version 10 is the same as version 9 (KIP-951). // Version 10 is the same as version 9 (KIP-951).
"validVersions": "0-10", //
// Version 11 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-11",
"deprecatedVersions": "0-6", "deprecatedVersions": "0-6",
"flexibleVersions": "9+", "flexibleVersions": "9+",
"fields": [ "fields": [

View File

@ -34,7 +34,9 @@
// Version 9 enables flexible versions. // Version 9 enables flexible versions.
// //
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951) // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951)
"validVersions": "0-10", //
// Version 11 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-11",
"flexibleVersions": "9+", "flexibleVersions": "9+",
"fields": [ "fields": [
{ "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+", { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",

View File

@ -23,7 +23,9 @@
// Version 2 adds the committed leader epoch. // Version 2 adds the committed leader epoch.
// //
// Version 3 adds the member.id, group.instance.id and generation.id. // Version 3 adds the member.id, group.instance.id and generation.id.
"validVersions": "0-3", //
// Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId", { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

View File

@ -22,7 +22,9 @@
// Version 2 is the same as version 1. // Version 2 is the same as version 1.
// //
// Version 3 adds illegal generation, fenced instance id, and unknown member id errors. // Version 3 adds illegal generation, fenced instance id, and unknown member id errors.
"validVersions": "0-3", //
// Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+", "flexibleVersions": "3+",
"fields": [ "fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -42,6 +42,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortedException; import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.AbortableTransactionException;
import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData; import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
import org.apache.kafka.common.message.ApiMessageType; import org.apache.kafka.common.message.ApiMessageType;
@ -3146,6 +3147,45 @@ public class SenderTest {
txnManager.beginTransaction(); txnManager.beginTransaction();
} }
@Test
public void testAbortableTxnExceptionIsAnAbortableError() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "textAbortableTxnException", 60000, 100, apiVersions);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();
Future<RecordMetadata> request = appendToAccumulator(tp0);
sender.runOnce(); // send request
sendIdempotentProducerResponse(0, tp0, Errors.ABORTABLE_TRANSACTION, -1);
// Return AbortableTransactionException error. It should be abortable.
sender.runOnce();
assertFutureFailure(request, AbortableTransactionException.class);
assertTrue(txnManager.hasAbortableError());
TransactionalRequestResult result = txnManager.beginAbort();
sender.runOnce();
// Once the transaction is aborted, we should be able to begin a new one.
respondToEndTxn(Errors.NONE);
sender.runOnce();
assertTrue(txnManager::isInitializing);
prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
sender.runOnce();
assertTrue(txnManager::isReady);
assertTrue(result.isSuccessful());
result.await();
txnManager.beginTransaction();
}
@Test @Test
public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exception { public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws Exception {
Metrics m = new Metrics(); Metrics m = new Metrics();

View File

@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.AbortableTransactionException;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
@ -3514,6 +3515,126 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasOngoingTransaction()); assertFalse(transactionManager.hasOngoingTransaction());
} }
@Test
public void testAbortableTransactionExceptionInInitProducerId() {
TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareInitPidResponse(Errors.ABORTABLE_TRANSACTION, false, producerId, RecordBatch.NO_PRODUCER_EPOCH);
runUntil(transactionManager::hasError);
assertTrue(initPidResult.isCompleted());
assertFalse(initPidResult.isSuccessful());
assertThrows(AbortableTransactionException.class, initPidResult::await);
assertAbortableError(AbortableTransactionException.class);
}
@Test
public void testAbortableTransactionExceptionInAddPartitions() {
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions();
transactionManager.beginTransaction();
transactionManager.maybeAddPartition(tp);
prepareAddPartitionsToTxn(tp, Errors.ABORTABLE_TRANSACTION);
runUntil(transactionManager::hasError);
assertTrue(transactionManager.lastError() instanceof AbortableTransactionException);
assertAbortableError(AbortableTransactionException.class);
}
@Test
public void testAbortableTransactionExceptionInFindCoordinator() {
doInitTransactions();
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId));
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
runUntil(() -> !transactionManager.hasPartitionsToAdd());
prepareFindCoordinatorResponse(Errors.ABORTABLE_TRANSACTION, false, CoordinatorType.GROUP, consumerGroupId);
runUntil(transactionManager::hasError);
assertTrue(transactionManager.lastError() instanceof AbortableTransactionException);
runUntil(sendOffsetsResult::isCompleted);
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException);
assertAbortableError(AbortableTransactionException.class);
}
@Test
public void testAbortableTransactionExceptionInEndTxn() throws InterruptedException {
doInitTransactions();
transactionManager.beginTransaction();
transactionManager.maybeAddPartition(tp0);
TransactionalRequestResult commitResult = transactionManager.beginCommit();
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
prepareProduceResponse(Errors.NONE, producerId, epoch);
prepareEndTxnResponse(Errors.ABORTABLE_TRANSACTION, TransactionResult.COMMIT, producerId, epoch);
runUntil(commitResult::isCompleted);
runUntil(responseFuture::isDone);
assertThrows(KafkaException.class, commitResult::await);
assertFalse(commitResult.isSuccessful());
assertTrue(commitResult.isAcked());
assertAbortableError(AbortableTransactionException.class);
}
@Test
public void testAbortableTransactionExceptionInAddOffsetsToTxn() {
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions();
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId));
prepareAddOffsetsToTxnResponse(Errors.ABORTABLE_TRANSACTION, consumerGroupId, producerId, epoch);
runUntil(transactionManager::hasError);
assertTrue(transactionManager.lastError() instanceof AbortableTransactionException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException);
assertAbortableError(AbortableTransactionException.class);
}
@Test
public void testAbortableTransactionExceptionInTxnOffsetCommit() {
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions();
transactionManager.beginTransaction();
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId));
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, singletonMap(tp, Errors.ABORTABLE_TRANSACTION));
runUntil(transactionManager::hasError);
assertTrue(transactionManager.lastError() instanceof AbortableTransactionException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException);
assertAbortableError(AbortableTransactionException.class);
}
private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException { private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException {
final long nowMs = time.milliseconds(); final long nowMs = time.milliseconds();
return accumulator.append(tp.topic(), tp.partition(), nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, return accumulator.append(tp.topic(), tp.partition(), nowMs, "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS,

View File

@ -17,7 +17,7 @@
package kafka.coordinator.group package kafka.coordinator.group
import kafka.cluster.PartitionListener import kafka.cluster.PartitionListener
import kafka.server.{ActionQueue, ReplicaManager, RequestLocal} import kafka.server.{ActionQueue, ReplicaManager, RequestLocal, defaultError, genericError}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
@ -188,8 +188,10 @@ class CoordinatorPartitionWriter[T](
tp: TopicPartition, tp: TopicPartition,
transactionalId: String, transactionalId: String,
producerId: Long, producerId: Long,
producerEpoch: Short producerEpoch: Short,
apiVersion: Short
): CompletableFuture[VerificationGuard] = { ): CompletableFuture[VerificationGuard] = {
val supportedOperation = if (apiVersion >= 4) genericError else defaultError
val future = new CompletableFuture[VerificationGuard]() val future = new CompletableFuture[VerificationGuard]()
replicaManager.maybeStartTransactionVerificationForPartition( replicaManager.maybeStartTransactionVerificationForPartition(
topicPartition = tp, topicPartition = tp,
@ -204,7 +206,8 @@ class CoordinatorPartitionWriter[T](
} else { } else {
future.complete(verificationGuard) future.complete(verificationGuard)
} }
} },
supportedOperation
) )
future future
} }

View File

@ -905,7 +905,8 @@ private[group] class GroupCoordinator(
generationId: Int, generationId: Int,
offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit,
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { requestLocal: RequestLocal = RequestLocal.NoCaching,
apiVersion: Short): Unit = {
validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match { validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error }) case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
case None => case None =>
@ -928,7 +929,7 @@ private[group] class GroupCoordinator(
offsetTopicPartition, offsetMetadata, newRequestLocal, responseCallback, Some(verificationGuard)) offsetTopicPartition, offsetMetadata, newRequestLocal, responseCallback, Some(verificationGuard))
} }
} }
val supportedOperation = if (apiVersion >= 4) genericError else defaultError
groupManager.replicaManager.maybeStartTransactionVerificationForPartition( groupManager.replicaManager.maybeStartTransactionVerificationForPartition(
topicPartition = offsetTopicPartition, topicPartition = offsetTopicPartition,
transactionalId, transactionalId,
@ -941,7 +942,8 @@ private[group] class GroupCoordinator(
KafkaRequestHandler.wrapAsyncCallback( KafkaRequestHandler.wrapAsyncCallback(
postVerificationCallback, postVerificationCallback,
requestLocal requestLocal
) ),
supportedOperation
) )
} }
} }

View File

@ -476,7 +476,8 @@ private[group] class GroupCoordinatorAdapter(
request.generationId, request.generationId,
partitions.toMap, partitions.toMap,
callback, callback,
RequestLocal(bufferSupplier) RequestLocal(bufferSupplier),
context.apiVersion()
) )
future future

View File

@ -369,7 +369,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
if (txnMetadata.topicPartitions.contains(part)) if (txnMetadata.topicPartitions.contains(part))
(part, Errors.NONE) (part, Errors.NONE)
else else
(part, Errors.INVALID_TXN_STATE) (part, Errors.ABORTABLE_TRANSACTION)
}.toMap) }.toMap)
} }
} }

View File

@ -42,6 +42,17 @@ object AddPartitionsToTxnManager {
val VerificationTimeMsMetricName = "VerificationTimeMs" val VerificationTimeMsMetricName = "VerificationTimeMs"
} }
/**
* This is an enum which handles the Partition Response based on the Request Version and the exact operation
* defaultError: This is the default workflow which maps to cases when the Produce Request Version or the Txn_offset_commit request was lower than the first version supporting the new Error Class
* genericError: This maps to the case when the clients are updated to handle the AbortableTxnException
* addPartition: This is a WIP. To be updated as a part of KIP-890 Part 2
*/
sealed trait SupportedOperation
case object defaultError extends SupportedOperation
case object genericError extends SupportedOperation
case object addPartition extends SupportedOperation
/* /*
* Data structure to hold the transactional data to send to a node. Note -- at most one request per transactional ID * Data structure to hold the transactional data to send to a node. Note -- at most one request per transactional ID
* will exist at a time in the map. If a given transactional ID exists in the map, and a new request with the same ID * will exist at a time in the map. If a given transactional ID exists in the map, and a new request with the same ID
@ -49,7 +60,8 @@ object AddPartitionsToTxnManager {
*/ */
class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection,
val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback], val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback],
val startTimeMs: mutable.Map[String, Long]) val startTimeMs: mutable.Map[String, Long],
val supportedOperation: SupportedOperation)
class AddPartitionsToTxnManager( class AddPartitionsToTxnManager(
config: KafkaConfig, config: KafkaConfig,
@ -79,7 +91,8 @@ class AddPartitionsToTxnManager(
producerId: Long, producerId: Long,
producerEpoch: Short, producerEpoch: Short,
topicPartitions: Seq[TopicPartition], topicPartitions: Seq[TopicPartition],
callback: AddPartitionsToTxnManager.AppendCallback callback: AddPartitionsToTxnManager.AppendCallback,
supportedOperation: SupportedOperation
): Unit = { ): Unit = {
val coordinatorNode = getTransactionCoordinator(partitionFor(transactionalId)) val coordinatorNode = getTransactionCoordinator(partitionFor(transactionalId))
if (coordinatorNode.isEmpty) { if (coordinatorNode.isEmpty) {
@ -99,14 +112,16 @@ class AddPartitionsToTxnManager(
.setVerifyOnly(true) .setVerifyOnly(true)
.setTopics(topicCollection) .setTopics(topicCollection)
addTxnData(coordinatorNode.get, transactionData, callback) addTxnData(coordinatorNode.get, transactionData, callback, supportedOperation)
} }
} }
private def addTxnData( private def addTxnData(
node: Node, node: Node,
transactionData: AddPartitionsToTxnTransaction, transactionData: AddPartitionsToTxnTransaction,
callback: AddPartitionsToTxnManager.AppendCallback callback: AddPartitionsToTxnManager.AppendCallback,
supportedOperation: SupportedOperation
): Unit = { ): Unit = {
nodesToTransactions.synchronized { nodesToTransactions.synchronized {
val curTime = time.milliseconds() val curTime = time.milliseconds()
@ -115,7 +130,8 @@ class AddPartitionsToTxnManager(
new TransactionDataAndCallbacks( new TransactionDataAndCallbacks(
new AddPartitionsToTxnTransactionCollection(1), new AddPartitionsToTxnTransactionCollection(1),
mutable.Map[String, AddPartitionsToTxnManager.AppendCallback](), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback](),
mutable.Map[String, Long]())) mutable.Map[String, Long](),
supportedOperation))
val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId) val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
@ -210,6 +226,8 @@ class AddPartitionsToTxnManager(
val code = val code =
if (partitionResult.partitionErrorCode == Errors.PRODUCER_FENCED.code) if (partitionResult.partitionErrorCode == Errors.PRODUCER_FENCED.code)
Errors.INVALID_PRODUCER_EPOCH.code Errors.INVALID_PRODUCER_EPOCH.code
else if (partitionResult.partitionErrorCode() == Errors.ABORTABLE_TRANSACTION.code && transactionDataAndCallbacks.supportedOperation != genericError) // For backward compatibility with clients.
Errors.INVALID_TXN_STATE.code
else else
partitionResult.partitionErrorCode partitionResult.partitionErrorCode
unverified.put(tp, Errors.forCode(code)) unverified.put(tp, Errors.forCode(code))

View File

@ -721,7 +721,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback(Map.empty) sendResponseCallback(Map.empty)
else { else {
val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID val internalTopicsAllowed = request.header.clientId == AdminUtils.ADMIN_CLIENT_ID
val supportedOperation = if (request.header.apiVersion > 10) genericError else defaultError
// call the replica manager to append messages to the replicas // call the replica manager to append messages to the replicas
replicaManager.handleProduceAppend( replicaManager.handleProduceAppend(
timeout = produceRequest.timeout.toLong, timeout = produceRequest.timeout.toLong,
@ -731,7 +731,8 @@ class KafkaApis(val requestChannel: RequestChannel,
entriesPerPartition = authorizedRequestInfo, entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback, responseCallback = sendResponseCallback,
recordValidationStatsCallback = processingStatsCallback, recordValidationStatsCallback = processingStatsCallback,
requestLocal = requestLocal) requestLocal = requestLocal,
supportedOperation = supportedOperation)
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
// hence we clear its data here in order to let GC reclaim its memory since it is already appended to log // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log

View File

@ -795,6 +795,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
* thread calling this method * thread calling this method
* @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default. * @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default.
* @param supportedOperation determines the supported Operation based on the client's Request api version
* *
* The responseCallback is wrapped so that it is scheduled on a request handler thread. There, it should be called with * The responseCallback is wrapped so that it is scheduled on a request handler thread. There, it should be called with
* that request handler thread's thread local and not the one supplied to this method. * that request handler thread's thread local and not the one supplied to this method.
@ -807,7 +808,8 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit, responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (), recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.NoCaching, requestLocal: RequestLocal = RequestLocal.NoCaching,
actionQueue: ActionQueue = this.defaultActionQueue): Unit = { actionQueue: ActionQueue = this.defaultActionQueue,
supportedOperation: SupportedOperation): Unit = {
val transactionalProducerInfo = mutable.HashSet[(Long, Short)]() val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]() val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
@ -884,7 +886,8 @@ class ReplicaManager(val config: KafkaConfig,
KafkaRequestHandler.wrapAsyncCallback( KafkaRequestHandler.wrapAsyncCallback(
postVerificationCallback, postVerificationCallback,
requestLocal requestLocal
) ),
supportedOperation
) )
} }
@ -975,12 +978,13 @@ class ReplicaManager(val config: KafkaConfig,
/** /**
* *
* @param topicPartition the topic partition to maybe verify * @param topicPartition the topic partition to maybe verify
* @param transactionalId the transactional id for the transaction * @param transactionalId the transactional id for the transaction
* @param producerId the producer id for the producer writing to the transaction * @param producerId the producer id for the producer writing to the transaction
* @param producerEpoch the epoch of the producer writing to the transaction * @param producerEpoch the epoch of the producer writing to the transaction
* @param baseSequence the base sequence of the first record in the batch we are trying to append * @param baseSequence the base sequence of the first record in the batch we are trying to append
* @param callback the method to execute once the verification is either completed or returns an error * @param callback the method to execute once the verification is either completed or returns an error
* @param supportedOperation determines the supported operation based on the client's Request API version
* *
* When the verification returns, the callback will be supplied the error if it exists or Errors.NONE. * When the verification returns, the callback will be supplied the error if it exists or Errors.NONE.
* If the verification guard exists, it will also be supplied. Otherwise the SENTINEL verification guard will be returned. * If the verification guard exists, it will also be supplied. Otherwise the SENTINEL verification guard will be returned.
@ -992,7 +996,8 @@ class ReplicaManager(val config: KafkaConfig,
producerId: Long, producerId: Long,
producerEpoch: Short, producerEpoch: Short,
baseSequence: Int, baseSequence: Int,
callback: ((Errors, VerificationGuard)) => Unit callback: ((Errors, VerificationGuard)) => Unit,
supportedOperation: SupportedOperation
): Unit = { ): Unit = {
def generalizedCallback(results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = { def generalizedCallback(results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = {
val (preAppendErrors, verificationGuards) = results val (preAppendErrors, verificationGuards) = results
@ -1007,7 +1012,8 @@ class ReplicaManager(val config: KafkaConfig,
transactionalId, transactionalId,
producerId, producerId,
producerEpoch, producerEpoch,
generalizedCallback generalizedCallback,
supportedOperation
) )
} }
@ -1018,6 +1024,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param producerId the producer id for the producer writing to the transaction * @param producerId the producer id for the producer writing to the transaction
* @param producerEpoch the epoch of the producer writing to the transaction * @param producerEpoch the epoch of the producer writing to the transaction
* @param callback the method to execute once the verification is either completed or returns an error * @param callback the method to execute once the verification is either completed or returns an error
* @param supportedOperation determines the supported operation based on the client's Request API version
* *
* When the verification returns, the callback will be supplied the errors per topic partition if there were errors. * When the verification returns, the callback will be supplied the errors per topic partition if there were errors.
* The callback will also be supplied the verification guards per partition if they exist. It is possible to have an * The callback will also be supplied the verification guards per partition if they exist. It is possible to have an
@ -1029,7 +1036,8 @@ class ReplicaManager(val config: KafkaConfig,
transactionalId: String, transactionalId: String,
producerId: Long, producerId: Long,
producerEpoch: Short, producerEpoch: Short,
callback: ((Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])) => Unit callback: ((Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])) => Unit,
supportedOperation: SupportedOperation
): Unit = { ): Unit = {
// Skip verification if the request is not transactional or transaction verification is disabled. // Skip verification if the request is not transactional or transaction verification is disabled.
if (transactionalId == null || if (transactionalId == null ||
@ -1075,7 +1083,8 @@ class ReplicaManager(val config: KafkaConfig,
producerId = producerId, producerId = producerId,
producerEpoch = producerEpoch, producerEpoch = producerEpoch,
topicPartitions = verificationGuards.keys.toSeq, topicPartitions = verificationGuards.keys.toSeq,
callback = invokeCallback callback = invokeCallback,
supportedOperation = supportedOperation
)) ))
} }

View File

@ -200,7 +200,8 @@ object AbstractCoordinatorConcurrencyTest {
producerId: Long, producerId: Long,
producerEpoch: Short, producerEpoch: Short,
baseSequence: Int, baseSequence: Int,
callback: ((Errors, VerificationGuard)) => Unit callback: ((Errors, VerificationGuard)) => Unit,
supportedOperation: SupportedOperation
): Unit = { ): Unit = {
// Skip verification // Skip verification
callback((Errors.NONE, VerificationGuard.SENTINEL)) callback((Errors.NONE, VerificationGuard.SENTINEL))

View File

@ -21,7 +21,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, RecordTooLargeException} import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, RecordTooLargeException}
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, MemoryRecords, RecordBatch} import org.apache.kafka.common.record.{CompressionType, ControlRecordType, MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.requests.TransactionResult
@ -343,7 +343,8 @@ class CoordinatorPartitionWriterTest {
ArgumentMatchers.eq(10L), ArgumentMatchers.eq(10L),
ArgumentMatchers.eq(5.toShort), ArgumentMatchers.eq(5.toShort),
ArgumentMatchers.eq(RecordBatch.NO_SEQUENCE), ArgumentMatchers.eq(RecordBatch.NO_SEQUENCE),
callbackCapture.capture() callbackCapture.capture(),
ArgumentMatchers.any()
)).thenAnswer(_ => { )).thenAnswer(_ => {
callbackCapture.getValue.apply(( callbackCapture.getValue.apply((
error, error,
@ -355,7 +356,8 @@ class CoordinatorPartitionWriterTest {
tp, tp,
"transactional-id", "transactional-id",
10L, 10L,
5.toShort 5.toShort,
ApiKeys.TXN_OFFSET_COMMIT.latestVersion()
) )
if (error == Errors.NONE) { if (error == Errors.NONE) {

View File

@ -761,7 +761,8 @@ class GroupCoordinatorAdapterTest {
) )
)), )),
capturedCallback.capture(), capturedCallback.capture(),
ArgumentMatchers.eq(RequestLocal(bufferSupplier)) ArgumentMatchers.eq(RequestLocal(bufferSupplier)),
ArgumentMatchers.any()
) )
capturedCallback.getValue.apply(Map( capturedCallback.getValue.apply(Map(

View File

@ -24,13 +24,13 @@ import kafka.common.OffsetAndMetadata
import kafka.coordinator.AbstractCoordinatorConcurrencyTest import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, KafkaRequestHandler} import kafka.server.{DelayedOperationPurgatory, KafkaConfig, KafkaRequestHandler, RequestLocal}
import kafka.utils.CoreUtils import kafka.utils.CoreUtils
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse} import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -319,7 +319,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
// Since the replica manager is mocked we can use a dummy value for transactionalId. // Since the replica manager is mocked we can use a dummy value for transactionalId.
groupCoordinator.handleTxnCommitOffsets(member.group.groupId, "dummy-txn-id", producerId, producerEpoch, groupCoordinator.handleTxnCommitOffsets(member.group.groupId, "dummy-txn-id", producerId, producerEpoch,
JoinGroupRequest.UNKNOWN_MEMBER_ID, Option.empty, JoinGroupRequest.UNKNOWN_GENERATION_ID, JoinGroupRequest.UNKNOWN_MEMBER_ID, Option.empty, JoinGroupRequest.UNKNOWN_GENERATION_ID,
offsets, callbackWithTxnCompletion) offsets, callbackWithTxnCompletion, RequestLocal.NoCaching, ApiKeys.TXN_OFFSET_COMMIT.latestVersion())
replicaManager.tryCompleteActions() replicaManager.tryCompleteActions()
} finally lock.foreach(_.unlock()) } finally lock.foreach(_.unlock())
} }

View File

@ -22,7 +22,7 @@ import kafka.common.OffsetAndMetadata
import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, ReplicaManager, RequestLocal} import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, KafkaRequestHandler, ReplicaManager, RequestLocal}
import kafka.utils._ import kafka.utils._
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult} import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
@ -4173,7 +4173,8 @@ class GroupCoordinatorTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
any(), any(),
postVerificationCallback.capture() postVerificationCallback.capture(),
any()
)).thenAnswer( )).thenAnswer(
_ => postVerificationCallback.getValue()((verificationError, VerificationGuard.SENTINEL)) _ => postVerificationCallback.getValue()((verificationError, VerificationGuard.SENTINEL))
) )
@ -4198,7 +4199,7 @@ class GroupCoordinatorTest {
when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) when(replicaManager.getMagic(any[TopicPartition])).thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
groupCoordinator.handleTxnCommitOffsets(groupId, transactionalId, producerId, producerEpoch, groupCoordinator.handleTxnCommitOffsets(groupId, transactionalId, producerId, producerEpoch,
memberId, groupInstanceId, generationId, offsets, responseCallback) memberId, groupInstanceId, generationId, offsets, responseCallback, RequestLocal.NoCaching, ApiKeys.TXN_OFFSET_COMMIT.latestVersion())
val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
result result
} }

View File

@ -283,7 +283,7 @@ class TransactionCoordinatorTest {
coordinator.handleVerifyPartitionsInTransaction(transactionalId, 0L, 0, partitions, verifyPartitionsInTxnCallback) coordinator.handleVerifyPartitionsInTransaction(transactionalId, 0L, 0, partitions, verifyPartitionsInTxnCallback)
errors.foreach { case (_, error) => errors.foreach { case (_, error) =>
assertEquals(Errors.INVALID_TXN_STATE, error) assertEquals(Errors.ABORTABLE_TRANSACTION, error)
} }
} }
@ -399,7 +399,7 @@ class TransactionCoordinatorTest {
val extraPartitions = partitions ++ Set(new TopicPartition("topic2", 0)) val extraPartitions = partitions ++ Set(new TopicPartition("topic2", 0))
coordinator.handleVerifyPartitionsInTransaction(transactionalId, 0L, 0, extraPartitions, verifyPartitionsInTxnCallback) coordinator.handleVerifyPartitionsInTransaction(transactionalId, 0L, 0, extraPartitions, verifyPartitionsInTxnCallback)
assertEquals(Errors.INVALID_TXN_STATE, errors(new TopicPartition("topic2", 0))) assertEquals(Errors.ABORTABLE_TRANSACTION, errors(new TopicPartition("topic2", 0)))
assertEquals(Errors.NONE, errors(new TopicPartition("topic1", 0))) assertEquals(Errors.NONE, errors(new TopicPartition("topic1", 0)))
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId)) verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
} }

View File

@ -71,6 +71,7 @@ class AddPartitionsToTxnManagerTest {
private val authenticationErrorResponse = clientResponse(null, authException = new SaslAuthenticationException("")) private val authenticationErrorResponse = clientResponse(null, authException = new SaslAuthenticationException(""))
private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException("")) private val versionMismatchResponse = clientResponse(null, mismatchException = new UnsupportedVersionException(""))
private val disconnectedResponse = clientResponse(null, disconnected = true) private val disconnectedResponse = clientResponse(null, disconnected = true)
private val supportedOperation = genericError
private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181")) private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:2181"))
@ -106,9 +107,9 @@ class AddPartitionsToTxnManagerTest {
val transaction2Errors = mutable.Map[TopicPartition, Errors]() val transaction2Errors = mutable.Map[TopicPartition, Errors]()
val transaction3Errors = mutable.Map[TopicPartition, Errors]() val transaction3Errors = mutable.Map[TopicPartition, Errors]()
addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors)) addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors), supportedOperation)
addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors)) addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors), supportedOperation)
addPartitionsToTxnManager.verifyTransaction(transactionalId3, producerId3, producerEpoch = 0, topicPartitions, setErrors(transaction3Errors)) addPartitionsToTxnManager.verifyTransaction(transactionalId3, producerId3, producerEpoch = 0, topicPartitions, setErrors(transaction3Errors), supportedOperation)
// We will try to add transaction1 3 more times (retries). One will have the same epoch, one will have a newer epoch, and one will have an older epoch than the new one we just added. // We will try to add transaction1 3 more times (retries). One will have the same epoch, one will have a newer epoch, and one will have an older epoch than the new one we just added.
val transaction1RetryWithSameEpochErrors = mutable.Map[TopicPartition, Errors]() val transaction1RetryWithSameEpochErrors = mutable.Map[TopicPartition, Errors]()
@ -116,17 +117,17 @@ class AddPartitionsToTxnManagerTest {
val transaction1RetryWithOldEpochErrors = mutable.Map[TopicPartition, Errors]() val transaction1RetryWithOldEpochErrors = mutable.Map[TopicPartition, Errors]()
// Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data and send a retriable response. // Trying to add more transactional data for the same transactional ID, producer ID, and epoch should simply replace the old data and send a retriable response.
addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1RetryWithSameEpochErrors)) addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1RetryWithSameEpochErrors), supportedOperation)
val expectedNetworkErrors = topicPartitions.map(_ -> Errors.NETWORK_EXCEPTION).toMap val expectedNetworkErrors = topicPartitions.map(_ -> Errors.NETWORK_EXCEPTION).toMap
assertEquals(expectedNetworkErrors, transaction1Errors) assertEquals(expectedNetworkErrors, transaction1Errors)
// Trying to add more transactional data for the same transactional ID and producer ID, but new epoch should replace the old data and send an error response for it. // Trying to add more transactional data for the same transactional ID and producer ID, but new epoch should replace the old data and send an error response for it.
addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 1, topicPartitions, setErrors(transaction1RetryWithNewerEpochErrors)) addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 1, topicPartitions, setErrors(transaction1RetryWithNewerEpochErrors), supportedOperation)
val expectedEpochErrors = topicPartitions.map(_ -> Errors.INVALID_PRODUCER_EPOCH).toMap val expectedEpochErrors = topicPartitions.map(_ -> Errors.INVALID_PRODUCER_EPOCH).toMap
assertEquals(expectedEpochErrors, transaction1RetryWithSameEpochErrors) assertEquals(expectedEpochErrors, transaction1RetryWithSameEpochErrors)
// Trying to add more transactional data for the same transactional ID and producer ID, but an older epoch should immediately return with error and keep the old data queued to send. // Trying to add more transactional data for the same transactional ID and producer ID, but an older epoch should immediately return with error and keep the old data queued to send.
addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1RetryWithOldEpochErrors)) addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1RetryWithOldEpochErrors), supportedOperation)
assertEquals(expectedEpochErrors, transaction1RetryWithOldEpochErrors) assertEquals(expectedEpochErrors, transaction1RetryWithOldEpochErrors)
val requestsAndHandlers = addPartitionsToTxnManager.generateRequests().asScala val requestsAndHandlers = addPartitionsToTxnManager.generateRequests().asScala
@ -159,8 +160,8 @@ class AddPartitionsToTxnManagerTest {
val transactionErrors = mutable.Map[TopicPartition, Errors]() val transactionErrors = mutable.Map[TopicPartition, Errors]()
addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transactionErrors)) addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), supportedOperation)
addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors)) addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), supportedOperation)
val requestsAndHandlers = addPartitionsToTxnManager.generateRequests().asScala val requestsAndHandlers = addPartitionsToTxnManager.generateRequests().asScala
assertEquals(2, requestsAndHandlers.size) assertEquals(2, requestsAndHandlers.size)
@ -173,8 +174,8 @@ class AddPartitionsToTxnManagerTest {
} }
} }
addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors)) addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), supportedOperation)
addPartitionsToTxnManager.verifyTransaction(transactionalId3, producerId3, producerEpoch = 0, topicPartitions, setErrors(transactionErrors)) addPartitionsToTxnManager.verifyTransaction(transactionalId3, producerId3, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), supportedOperation)
// Test creationTimeMs increases too. // Test creationTimeMs increases too.
time.sleep(10) time.sleep(10)
@ -207,7 +208,8 @@ class AddPartitionsToTxnManagerTest {
producerId1, producerId1,
producerEpoch = 0, producerEpoch = 0,
topicPartitions, topicPartitions,
setErrors(errors) setErrors(errors),
supportedOperation
) )
assertEquals(topicPartitions.map(tp => tp -> Errors.COORDINATOR_NOT_AVAILABLE).toMap, errors) assertEquals(topicPartitions.map(tp => tp -> Errors.COORDINATOR_NOT_AVAILABLE).toMap, errors)
@ -240,8 +242,16 @@ class AddPartitionsToTxnManagerTest {
transaction1Errors.clear() transaction1Errors.clear()
transaction2Errors.clear() transaction2Errors.clear()
addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors)) addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors), supportedOperation)
addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors)) addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors), supportedOperation)
}
def addTransactionsToVerifyRequestVersion(operationExpected: SupportedOperation): Unit = {
transaction1Errors.clear()
transaction2Errors.clear()
addPartitionsToTxnManager.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transaction1Errors), operationExpected)
addPartitionsToTxnManager.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transaction2Errors), operationExpected)
} }
val expectedAuthErrors = topicPartitions.map(_ -> Errors.SASL_AUTHENTICATION_FAILED).toMap val expectedAuthErrors = topicPartitions.map(_ -> Errors.SASL_AUTHENTICATION_FAILED).toMap
@ -288,6 +298,32 @@ class AddPartitionsToTxnManagerTest {
receiveResponse(mixedErrorsResponse) receiveResponse(mixedErrorsResponse)
assertEquals(expectedTransaction1Errors, transaction1Errors) assertEquals(expectedTransaction1Errors, transaction1Errors)
assertEquals(expectedTransaction2Errors, transaction2Errors) assertEquals(expectedTransaction2Errors, transaction2Errors)
val preConvertedAbortableTransaction1Errors = topicPartitions.map(_ -> Errors.ABORTABLE_TRANSACTION).toMap
val preConvertedAbortableTransaction2Errors = Map(new TopicPartition("foo", 1) -> Errors.NONE,
new TopicPartition("foo", 2) -> Errors.ABORTABLE_TRANSACTION,
new TopicPartition("foo", 3) -> Errors.NONE)
val abortableTransaction1ErrorResponse = AddPartitionsToTxnResponse.resultForTransaction(transactionalId1, preConvertedAbortableTransaction1Errors.asJava)
val abortableTransaction2ErrorResponse = AddPartitionsToTxnResponse.resultForTransaction(transactionalId2, preConvertedAbortableTransaction2Errors.asJava)
val mixedErrorsAddPartitionsResponseAbortableError = new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData()
.setResultsByTransaction(new AddPartitionsToTxnResultCollection(Seq(abortableTransaction1ErrorResponse, abortableTransaction2ErrorResponse).iterator.asJava)))
val mixedAbortableErrorsResponse = clientResponse(mixedErrorsAddPartitionsResponseAbortableError)
val expectedAbortableTransaction1ErrorsLowerVersion = topicPartitions.map(_ -> Errors.INVALID_TXN_STATE).toMap
val expectedAbortableTransaction2ErrorsLowerVersion = Map(new TopicPartition("foo", 2) -> Errors.INVALID_TXN_STATE)
val expectedAbortableTransaction1ErrorsHigherVersion = topicPartitions.map(_ -> Errors.ABORTABLE_TRANSACTION).toMap
val expectedAbortableTransaction2ErrorsHigherVersion = Map(new TopicPartition("foo", 2) -> Errors.ABORTABLE_TRANSACTION)
addTransactionsToVerifyRequestVersion(defaultError)
receiveResponse(mixedAbortableErrorsResponse)
assertEquals(expectedAbortableTransaction1ErrorsLowerVersion, transaction1Errors)
assertEquals(expectedAbortableTransaction2ErrorsLowerVersion, transaction2Errors)
addTransactionsToVerifyRequestVersion(genericError)
receiveResponse(mixedAbortableErrorsResponse)
assertEquals(expectedAbortableTransaction1ErrorsHigherVersion, transaction1Errors)
assertEquals(expectedAbortableTransaction2ErrorsHigherVersion, transaction2Errors)
} }
@Test @Test
@ -325,8 +361,8 @@ class AddPartitionsToTxnManagerTest {
) )
try { try {
addPartitionsManagerWithMockedMetrics.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transactionErrors)) addPartitionsManagerWithMockedMetrics.verifyTransaction(transactionalId1, producerId1, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), supportedOperation)
addPartitionsManagerWithMockedMetrics.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors)) addPartitionsManagerWithMockedMetrics.verifyTransaction(transactionalId2, producerId2, producerEpoch = 0, topicPartitions, setErrors(transactionErrors), supportedOperation)
time.sleep(100) time.sleep(100)

View File

@ -163,7 +163,7 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
val verifyErrors = verifyResponse.errors() val verifyErrors = verifyResponse.errors()
assertEquals(Collections.singletonMap(transactionalId, Collections.singletonMap(tp0, Errors.INVALID_TXN_STATE)), verifyErrors) assertEquals(Collections.singletonMap(transactionalId, Collections.singletonMap(tp0, Errors.ABORTABLE_TRANSACTION)), verifyErrors)
} }
private def setUpTransactions(transactionalId: String, verifyOnly: Boolean, partitions: Set[TopicPartition]): (Int, AddPartitionsToTxnTransaction) = { private def setUpTransactions(transactionalId: String, verifyOnly: Boolean, partitions: Set[TopicPartition]): (Int, AddPartitionsToTxnTransaction) = {

View File

@ -2487,6 +2487,7 @@ class KafkaApisTest extends Logging {
responseCallback.capture(), responseCallback.capture(),
any(), any(),
any(), any(),
any(),
any() any()
)).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH)))) )).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
@ -2548,6 +2549,7 @@ class KafkaApisTest extends Logging {
responseCallback.capture(), responseCallback.capture(),
any(), any(),
any(), any(),
any(),
any()) any())
).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)))) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
@ -2612,6 +2614,7 @@ class KafkaApisTest extends Logging {
responseCallback.capture(), responseCallback.capture(),
any(), any(),
any(), any(),
any(),
any()) any())
).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)))) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
@ -2675,6 +2678,7 @@ class KafkaApisTest extends Logging {
responseCallback.capture(), responseCallback.capture(),
any(), any(),
any(), any(),
any(),
any()) any())
).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)))) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
@ -2742,6 +2746,7 @@ class KafkaApisTest extends Logging {
any(), any(),
any(), any(),
any(), any(),
any(),
any()) any())
} finally { } finally {
kafkaApis.close() kafkaApis.close()

View File

@ -112,6 +112,7 @@ class ReplicaManagerTest {
private var mockRemoteLogManager: RemoteLogManager = _ private var mockRemoteLogManager: RemoteLogManager = _
private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _ private var addPartitionsToTxnManager: AddPartitionsToTxnManager = _
private var brokerTopicStats: BrokerTopicStats = _ private var brokerTopicStats: BrokerTopicStats = _
private val supportedOperation = genericError
// Constants defined for readability // Constants defined for readability
private val zkVersion = 0 private val zkVersion = 0
@ -132,7 +133,7 @@ class ReplicaManagerTest {
addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
// Anytime we try to verify, just automatically run the callback as though the transaction was verified. // Anytime we try to verify, just automatically run the callback as though the transaction was verified.
when(addPartitionsToTxnManager.verifyTransaction(any(), any(), any(), any(), any())).thenAnswer { invocationOnMock => when(addPartitionsToTxnManager.verifyTransaction(any(), any(), any(), any(), any(), any())).thenAnswer { invocationOnMock =>
val callback = invocationOnMock.getArgument(4, classOf[AddPartitionsToTxnManager.AppendCallback]) val callback = invocationOnMock.getArgument(4, classOf[AddPartitionsToTxnManager.AppendCallback])
callback(Map.empty[TopicPartition, Errors].toMap) callback(Map.empty[TopicPartition, Errors].toMap)
} }
@ -2182,7 +2183,7 @@ class ReplicaManagerTest {
val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, val idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
new SimpleRecord("message".getBytes)) new SimpleRecord("message".getBytes))
handleProduceAppend(replicaManager, tp0, idempotentRecords, transactionalId = null) handleProduceAppend(replicaManager, tp0, idempotentRecords, transactionalId = null)
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any[AddPartitionsToTxnManager.AppendCallback]()) verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any[AddPartitionsToTxnManager.AppendCallback](), any())
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId)) assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
// If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records. // If we supply a transactional ID and some transactional and some idempotent records, we should only verify the topic partition with transactional records.
@ -2197,7 +2198,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)), ArgumentMatchers.eq(Seq(tp0)),
any[AddPartitionsToTxnManager.AppendCallback]() any[AddPartitionsToTxnManager.AppendCallback](),
any()
) )
assertNotEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId)) assertNotEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp1, producerId)) assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp1, producerId))
@ -2233,7 +2235,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)), ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture() appendCallback.capture(),
any()
) )
val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId) val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
@ -2252,7 +2255,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)), ArgumentMatchers.eq(Seq(tp0)),
appendCallback2.capture() appendCallback2.capture(),
any()
) )
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
@ -2291,7 +2295,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)), ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture() appendCallback.capture(),
any()
) )
val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId) val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
@ -2313,7 +2318,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)), ArgumentMatchers.eq(Seq(tp0)),
appendCallback2.capture() appendCallback2.capture(),
any()
) )
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
@ -2391,7 +2397,7 @@ class ReplicaManagerTest {
assertThrows(classOf[InvalidPidMappingException], assertThrows(classOf[InvalidPidMappingException],
() => handleProduceAppendToMultipleTopics(replicaManager, transactionalRecords, transactionalId = transactionalId)) () => handleProduceAppendToMultipleTopics(replicaManager, transactionalRecords, transactionalId = transactionalId))
// We should not add these partitions to the manager to verify. // We should not add these partitions to the manager to verify.
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any()) verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any(), any())
} finally { } finally {
replicaManager.shutdown(checkpointHW = false) replicaManager.shutdown(checkpointHW = false)
} }
@ -2415,7 +2421,7 @@ class ReplicaManagerTest {
handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId).onFire { response => handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId).onFire { response =>
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error)
} }
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any()) verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any(), any())
} finally { } finally {
replicaManager.shutdown(checkpointHW = false) replicaManager.shutdown(checkpointHW = false)
} }
@ -2449,7 +2455,7 @@ class ReplicaManagerTest {
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId)) assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId))
// We should not add these partitions to the manager to verify. // We should not add these partitions to the manager to verify.
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any()) verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any(), any())
// Dynamically enable verification. // Dynamically enable verification.
config.dynamicConfig.initialize(None, None) config.dynamicConfig.initialize(None, None)
@ -2463,7 +2469,7 @@ class ReplicaManagerTest {
new SimpleRecord("message".getBytes)) new SimpleRecord("message".getBytes))
handleProduceAppend(replicaManager, tp, moreTransactionalRecords, transactionalId = transactionalId) handleProduceAppend(replicaManager, tp, moreTransactionalRecords, transactionalId = transactionalId)
verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any()) verify(addPartitionsToTxnManager, times(0)).verifyTransaction(any(), any(), any(), any(), any(), any())
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId)) assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp, producerId))
assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId)) assertTrue(replicaManager.localLog(tp).get.hasOngoingTransaction(producerId))
} finally { } finally {
@ -2497,7 +2503,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)), ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture() appendCallback.capture(),
any()
) )
val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId) val verificationGuard = getVerificationGuard(replicaManager, tp0, producerId)
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId)) assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
@ -2517,7 +2524,7 @@ class ReplicaManagerTest {
// This time we do not verify // This time we do not verify
handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId) handleProduceAppend(replicaManager, tp0, transactionalRecords, transactionalId = transactionalId)
verify(addPartitionsToTxnManager, times(1)).verifyTransaction(any(), any(), any(), any(), any()) verify(addPartitionsToTxnManager, times(1)).verifyTransaction(any(), any(), any(), any(), any(), any())
assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId)) assertEquals(VerificationGuard.SENTINEL, getVerificationGuard(replicaManager, tp0, producerId))
assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId)) assertTrue(replicaManager.localLog(tp0).get.hasOngoingTransaction(producerId))
} finally { } finally {
@ -2552,7 +2559,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)), ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture() appendCallback.capture(),
any()
) )
// Confirm we did not write to the log and instead returned the converted error with the correct error message. // Confirm we did not write to the log and instead returned the converted error with the correct error message.
@ -2582,7 +2590,8 @@ class ReplicaManagerTest {
ArgumentMatchers.eq(producerId), ArgumentMatchers.eq(producerId),
ArgumentMatchers.eq(producerEpoch), ArgumentMatchers.eq(producerEpoch),
ArgumentMatchers.eq(Seq(tp0)), ArgumentMatchers.eq(Seq(tp0)),
appendCallback.capture() appendCallback.capture(),
any()
) )
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, result.assertFired.left.getOrElse(Errors.NONE)) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, result.assertFired.left.getOrElse(Errors.NONE))
} finally { } finally {
@ -3040,6 +3049,7 @@ class ReplicaManagerTest {
transactionalId = transactionalId, transactionalId = transactionalId,
entriesPerPartition = entriesToAppend, entriesPerPartition = entriesToAppend,
responseCallback = appendCallback, responseCallback = appendCallback,
supportedOperation = supportedOperation
) )
result result
@ -3067,6 +3077,7 @@ class ReplicaManagerTest {
transactionalId = transactionalId, transactionalId = transactionalId,
entriesPerPartition = entriesPerPartition, entriesPerPartition = entriesPerPartition,
responseCallback = appendCallback, responseCallback = appendCallback,
supportedOperation = supportedOperation
) )
result result
@ -3091,7 +3102,8 @@ class ReplicaManagerTest {
producerId, producerId,
producerEpoch, producerEpoch,
baseSequence, baseSequence,
postVerificationCallback postVerificationCallback,
supportedOperation
) )
result result
} }

View File

@ -858,7 +858,8 @@ public class GroupCoordinatorService implements GroupCoordinator {
request.producerId(), request.producerId(),
request.producerEpoch(), request.producerEpoch(),
Duration.ofMillis(config.offsetCommitTimeoutMs), Duration.ofMillis(config.offsetCommitTimeoutMs),
coordinator -> coordinator.commitTransactionalOffset(context, request) coordinator -> coordinator.commitTransactionalOffset(context, request),
context.apiVersion()
).exceptionally(exception -> handleOperationException( ).exceptionally(exception -> handleOperationException(
"txn-commit-offset", "txn-commit-offset",
request, request,

View File

@ -1482,6 +1482,7 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
* @param producerEpoch The producer epoch. * @param producerEpoch The producer epoch.
* @param timeout The write operation timeout. * @param timeout The write operation timeout.
* @param op The write operation. * @param op The write operation.
* @param apiVersion The Version of the Txn_Offset_Commit request
* *
* @return A future that will be completed with the result of the write operation * @return A future that will be completed with the result of the write operation
* when the operation is completed or an exception if the write operation failed. * when the operation is completed or an exception if the write operation failed.
@ -1495,7 +1496,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
long producerId, long producerId,
short producerEpoch, short producerEpoch,
Duration timeout, Duration timeout,
CoordinatorWriteOperation<S, T, U> op CoordinatorWriteOperation<S, T, U> op,
Short apiVersion
) { ) {
throwIfNotRunning(); throwIfNotRunning();
log.debug("Scheduled execution of transactional write operation {}.", name); log.debug("Scheduled execution of transactional write operation {}.", name);
@ -1503,7 +1505,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
tp, tp,
transactionalId, transactionalId,
producerId, producerId,
producerEpoch producerEpoch,
apiVersion
).thenCompose(verificationGuard -> { ).thenCompose(verificationGuard -> {
CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>( CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<>(
name, name,

View File

@ -128,6 +128,7 @@ public interface PartitionWriter<T> {
* @param transactionalId The transactional id. * @param transactionalId The transactional id.
* @param producerId The producer id. * @param producerId The producer id.
* @param producerEpoch The producer epoch. * @param producerEpoch The producer epoch.
* @param apiVersion The version of the Request used.
* @return A future failed with any error encountered; or the {@link VerificationGuard} * @return A future failed with any error encountered; or the {@link VerificationGuard}
* if the transaction required verification and {@link VerificationGuard#SENTINEL} * if the transaction required verification and {@link VerificationGuard#SENTINEL}
* if it did not. * if it did not.
@ -137,6 +138,7 @@ public interface PartitionWriter<T> {
TopicPartition tp, TopicPartition tp,
String transactionalId, String transactionalId,
long producerId, long producerId,
short producerEpoch short producerEpoch,
short apiVersion
) throws KafkaException; ) throws KafkaException;
} }

View File

@ -1887,6 +1887,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq(10L), ArgumentMatchers.eq(10L),
ArgumentMatchers.eq((short) 5), ArgumentMatchers.eq((short) 5),
ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(response)); )).thenReturn(CompletableFuture.completedFuture(response));
@ -1937,6 +1938,7 @@ public class GroupCoordinatorServiceTest {
ArgumentMatchers.eq(10L), ArgumentMatchers.eq(10L),
ArgumentMatchers.eq((short) 5), ArgumentMatchers.eq((short) 5),
ArgumentMatchers.eq(Duration.ofMillis(5000)), ArgumentMatchers.eq(Duration.ofMillis(5000)),
ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new CompletionException(Errors.NOT_ENOUGH_REPLICAS.exception()))); )).thenReturn(FutureUtils.failedFuture(new CompletionException(Errors.NOT_ENOUGH_REPLICAS.exception())));

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.requests.TransactionResult;
@ -87,6 +88,8 @@ public class CoordinatorRuntimeTest {
private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0); private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0);
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5); private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
private static final short TXN_OFFSET_COMMIT_LATEST_VERSION = ApiKeys.TXN_OFFSET_COMMIT.latestVersion();
/** /**
* A CoordinatorEventProcessor that directly executes the operations. This is * A CoordinatorEventProcessor that directly executes the operations. This is
* useful in unit tests where execution in threads is not required. * useful in unit tests where execution in threads is not required.
@ -1246,7 +1249,8 @@ public class CoordinatorRuntimeTest {
TP, TP,
"transactional-id", "transactional-id",
100L, 100L,
(short) 50 (short) 50,
TXN_OFFSET_COMMIT_LATEST_VERSION
)).thenReturn(CompletableFuture.completedFuture(guard)); )).thenReturn(CompletableFuture.completedFuture(guard));
// Schedule a transactional write. // Schedule a transactional write.
@ -1257,7 +1261,8 @@ public class CoordinatorRuntimeTest {
100L, 100L,
(short) 50, (short) 50,
Duration.ofMillis(5000), Duration.ofMillis(5000),
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response") state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response"),
TXN_OFFSET_COMMIT_LATEST_VERSION
); );
// Verify that the writer got the records with the correct // Verify that the writer got the records with the correct
@ -1328,7 +1333,8 @@ public class CoordinatorRuntimeTest {
TP, TP,
"transactional-id", "transactional-id",
100L, 100L,
(short) 50 (short) 50,
TXN_OFFSET_COMMIT_LATEST_VERSION
)).thenReturn(FutureUtils.failedFuture(Errors.NOT_ENOUGH_REPLICAS.exception())); )).thenReturn(FutureUtils.failedFuture(Errors.NOT_ENOUGH_REPLICAS.exception()));
// Schedule a transactional write. // Schedule a transactional write.
@ -1339,7 +1345,8 @@ public class CoordinatorRuntimeTest {
100L, 100L,
(short) 50, (short) 50,
Duration.ofMillis(5000), Duration.ofMillis(5000),
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response") state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response"),
TXN_OFFSET_COMMIT_LATEST_VERSION
); );
// Verify that the future is failed with the expected exception. // Verify that the future is failed with the expected exception.
@ -1391,7 +1398,8 @@ public class CoordinatorRuntimeTest {
100L, 100L,
(short) 5, (short) 5,
DEFAULT_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1") state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"),
TXN_OFFSET_COMMIT_LATEST_VERSION
); );
// Verify that the write is not committed yet. // Verify that the write is not committed yet.
@ -1556,7 +1564,9 @@ public class CoordinatorRuntimeTest {
100L, 100L,
(short) 5, (short) 5,
DEFAULT_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
// Verify that the state has been updated. // Verify that the state has been updated.
assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(2L, ctx.coordinator.lastWrittenOffset());
@ -1638,7 +1648,9 @@ public class CoordinatorRuntimeTest {
100L, 100L,
(short) 5, (short) 5,
DEFAULT_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1")); state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"),
TXN_OFFSET_COMMIT_LATEST_VERSION
);
// Verify that the state has been updated. // Verify that the state has been updated.
assertEquals(2L, ctx.coordinator.lastWrittenOffset()); assertEquals(2L, ctx.coordinator.lastWrittenOffset());

View File

@ -274,7 +274,8 @@ public class InMemoryPartitionWriter<T> implements PartitionWriter<T> {
TopicPartition tp, TopicPartition tp,
String transactionalId, String transactionalId,
long producerId, long producerId,
short producerEpoch short producerEpoch,
short apiVersion
) throws KafkaException { ) throws KafkaException {
return CompletableFuture.completedFuture(new VerificationGuard()); return CompletableFuture.completedFuture(new VerificationGuard());
} }