KAFKA-9911: Add new PRODUCER_FENCED error code (#8549)

Add a separate error code as PRODUCER_FENCED to differentiate INVALID_PRODUCER_EPOCH. On broker side, replace INVALID_PRODUCER_EPOCH with PRODUCER_FENCED when the request version is the latest, while still returning INVALID_PRODUCER_EPOCH to older clients. On client side, simply handling INVALID_PRODUCER_EPOCH the same as PRODUCER_FENCED if from txn coordinator APIs.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Boyang Chen 2020-08-12 08:54:01 -07:00 committed by GitHub
parent 89e12f3c6b
commit b937ec7567
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 570 additions and 106 deletions

View File

@ -379,7 +379,7 @@ public class TransactionManager {
EndTxnHandler handler = new EndTxnHandler(builder);
enqueueRequest(handler);
if (!shouldBumpEpoch()) {
if (!epochBumpRequired) {
return handler.result;
}
}
@ -553,10 +553,6 @@ public class TransactionManager {
this.partitionsToRewriteSequences.add(tp);
}
private boolean shouldBumpEpoch() {
return epochBumpRequired;
}
private void bumpIdempotentProducerEpoch() {
if (this.producerIdAndEpoch.epoch == Short.MAX_VALUE) {
resetIdempotentProducerId();
@ -577,7 +573,7 @@ public class TransactionManager {
synchronized void bumpIdempotentEpochAndResetIdIfNeeded() {
if (!isTransactional()) {
if (shouldBumpEpoch()) {
if (epochBumpRequired) {
bumpIdempotentProducerEpoch();
}
if (currentState != State.INITIALIZING && !hasProducerId()) {
@ -1014,10 +1010,14 @@ public class TransactionManager {
if (!isTransactional()) {
// For the idempotent producer, always retry UNKNOWN_PRODUCER_ID errors. If the batch has the current
// producer ID and epoch, request a bump of the epoch. Otherwise just retry, as the
// producer ID and epoch, request a bump of the epoch. Otherwise just retry the produce.
requestEpochBumpForPartition(batch.topicPartition);
return true;
}
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
// Retry the initProducerId to bump the epoch and continue.
requestEpochBumpForPartition(batch.topicPartition);
return true;
} else if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
if (!hasUnresolvedSequence(batch.topicPartition) &&
(batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()))) {
@ -1366,6 +1366,10 @@ public class TransactionManager {
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
error == Errors.CLUSTER_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
} else {
fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
}
@ -1417,8 +1421,10 @@ public class TransactionManager {
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
reenqueue();
return;
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
fatalError(error.exception());
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
return;
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
@ -1575,8 +1581,10 @@ public class TransactionManager {
reenqueue();
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
reenqueue();
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
fatalError(error.exception());
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (error == Errors.INVALID_TXN_STATE) {
@ -1632,8 +1640,10 @@ public class TransactionManager {
reenqueue();
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
abortableErrorIfPossible(error.exception());
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
fatalError(error.exception());
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {

View File

@ -17,7 +17,7 @@
package org.apache.kafka.common.errors;
/**
* A retryable exception is a transient exception that if retried may succeed.
* A retriable exception is a transient exception that if retried may succeed.
*/
public abstract class RetriableException extends ApiException {

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.internals;
import org.apache.kafka.common.errors.RetriableException;
/**
* This exception indicates that the produce request sent to the partition leader
* contains a non-matching producer epoch. When encountering this exception, the ongoing transaction
* will be aborted and can be retried.
*/
public class InvalidProducerEpochException extends RetriableException {
private static final long serialVersionUID = 1L;
public InvalidProducerEpochException(String message) {
super(message);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.internals.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ListenerNotFoundException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
@ -228,9 +229,8 @@ public enum Errors {
OutOfOrderSequenceException::new),
DUPLICATE_SEQUENCE_NUMBER(46, "The broker received a duplicate sequence number.",
DuplicateSequenceException::new),
INVALID_PRODUCER_EPOCH(47, "Producer attempted an operation with an old epoch. Either there is a newer producer " +
"with the same transactionalId, or the producer's transaction has been expired by the broker.",
ProducerFencedException::new),
INVALID_PRODUCER_EPOCH(47, "Producer attempted to produce with an old epoch.",
InvalidProducerEpochException::new),
INVALID_TXN_STATE(48, "The producer attempted a transactional operation in an invalid state.",
InvalidTxnStateException::new),
INVALID_PRODUCER_ID_MAPPING(49, "The producer attempted to use a producer id which is not currently assigned to " +
@ -320,10 +320,12 @@ public enum Errors {
NO_REASSIGNMENT_IN_PROGRESS(85, "No partition reassignment is in progress.",
NoReassignmentInProgressException::new),
GROUP_SUBSCRIBED_TO_TOPIC(86, "Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.",
GroupSubscribedToTopicException::new),
GroupSubscribedToTopicException::new),
INVALID_RECORD(87, "This record has failed the validation on broker and hence will be rejected.", InvalidRecordException::new),
UNSTABLE_OFFSET_COMMIT(88, "There are unstable offsets that need to be cleared.", UnstableOffsetCommitException::new),
THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", ThrottlingQuotaExceededException::new);
THROTTLING_QUOTA_EXCEEDED(89, "The throttling quota has been exceeded.", ThrottlingQuotaExceededException::new),
PRODUCER_FENCED(90, "There is a newer producer with the same transactionalId " +
"which fences the current one.", ProducerFencedException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -31,7 +31,8 @@ import java.util.Map;
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
* - {@link Errors#INVALID_PRODUCER_EPOCH}
* - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
* - {@link Errors#PRODUCER_FENCED}
* - {@link Errors#INVALID_TXN_STATE}
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}

View File

@ -38,7 +38,8 @@ import java.util.Map;
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_TXN_STATE}
* - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
* - {@link Errors#INVALID_PRODUCER_EPOCH}
* - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
* - {@link Errors#PRODUCER_FENCED}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}

View File

@ -32,7 +32,8 @@ import java.util.Map;
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_TXN_STATE}
* - {@link Errors#INVALID_PRODUCER_ID_MAPPING}
* - {@link Errors#INVALID_PRODUCER_EPOCH}
* - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=1
* - {@link Errors#PRODUCER_FENCED}
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
*/
public class EndTxnResponse extends AbstractResponse {

View File

@ -26,11 +26,14 @@ import java.util.Map;
/**
* Possible error codes:
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
* - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
*
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED}
* - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
* - {@link Errors#INVALID_PRODUCER_EPOCH} // for version <=3
* - {@link Errors#PRODUCER_FENCED}
*/
public class InitProducerIdResponse extends AbstractResponse {
public final InitProducerIdResponseData data;

View File

@ -18,7 +18,9 @@
"type": "request",
"name": "AddOffsetsToTxnRequest",
// Version 1 is the same as version 0.
"validVersions": "0-1",
//
// Version 2 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-2",
"flexibleVersions": "none",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

View File

@ -18,7 +18,9 @@
"type": "response",
"name": "AddOffsetsToTxnResponse",
// Starting in version 1, on quota violation brokers send out responses before throttling.
"validVersions": "0-1",
//
// Version 2 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-2",
"flexibleVersions": "none",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -18,7 +18,9 @@
"type": "request",
"name": "AddPartitionsToTxnRequest",
// Version 1 is the same as version 0.
"validVersions": "0-1",
//
// Version 2 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-2",
"flexibleVersions": "none",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

View File

@ -18,7 +18,9 @@
"type": "response",
"name": "AddPartitionsToTxnResponse",
// Starting in version 1, on quota violation brokers send out responses before throttling.
"validVersions": "0-1",
//
// Version 2 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-2",
"flexibleVersions": "none",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -18,7 +18,9 @@
"type": "request",
"name": "EndTxnRequest",
// Version 1 is the same as version 0.
"validVersions": "0-1",
//
// Version 2 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-2",
"flexibleVersions": "none",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",

View File

@ -18,7 +18,9 @@
"type": "response",
"name": "EndTxnResponse",
// Starting in version 1, on quota violation, brokers send out responses before throttling.
"validVersions": "0-1",
//
// Version 2 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-2",
"flexibleVersions": "none",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

View File

@ -22,7 +22,9 @@
// Version 2 is the first flexible version.
//
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
"validVersions": "0-3",
//
// Version 4 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-4",
"flexibleVersions": "2+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId",

View File

@ -22,7 +22,9 @@
// Version 2 is the first flexible version.
//
// Version 3 is the same as version 2.
"validVersions": "0-3",
//
// Version 4 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-4",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,

View File

@ -1293,7 +1293,7 @@ public class TransactionManagerTest {
assertTrue(transactionManager.hasAbortableError());
transactionManager.beginAbort();
runUntil(responseFuture::isDone);
assertFutureFailed(responseFuture);
assertProduceFutureFailed(responseFuture);
// No partitions added, so no need to prepare EndTxn response
runUntil(transactionManager::isReady);
@ -1349,8 +1349,8 @@ public class TransactionManagerTest {
transactionManager.beginAbort();
runUntil(transactionManager::isReady);
// neither produce request has been sent, so they should both be failed immediately
assertFutureFailed(authorizedTopicProduceFuture);
assertFutureFailed(unauthorizedTopicProduceFuture);
assertProduceFutureFailed(authorizedTopicProduceFuture);
assertProduceFutureFailed(unauthorizedTopicProduceFuture);
assertFalse(transactionManager.hasPartitionsToAdd());
assertFalse(accumulator.hasIncomplete());
@ -1407,7 +1407,7 @@ public class TransactionManagerTest {
prepareProduceResponse(Errors.NONE, producerId, epoch);
runUntil(authorizedTopicProduceFuture::isDone);
assertFutureFailed(unauthorizedTopicProduceFuture);
assertProduceFutureFailed(unauthorizedTopicProduceFuture);
assertNotNull(authorizedTopicProduceFuture.get());
assertTrue(authorizedTopicProduceFuture.isDone());
@ -1539,7 +1539,45 @@ public class TransactionManagerTest {
}
@Test
public void testProducerFencedException() throws InterruptedException {
public void testProducerFencedExceptionInInitProducerId() {
verifyProducerFencedForInitProducerId(Errors.PRODUCER_FENCED);
}
@Test
public void testInvalidProducerEpochConvertToProducerFencedInInitProducerId() {
verifyProducerFencedForInitProducerId(Errors.INVALID_PRODUCER_EPOCH);
}
private void verifyProducerFencedForInitProducerId(Errors error) {
TransactionalRequestResult result = transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
prepareInitPidResponse(error, false, producerId, epoch);
runUntil(transactionManager::hasError);
assertEquals(ProducerFencedException.class, result.error().getClass());
assertThrows(ProducerFencedException.class, () -> transactionManager.beginTransaction());
assertThrows(ProducerFencedException.class, () -> transactionManager.beginCommit());
assertThrows(ProducerFencedException.class, () -> transactionManager.beginAbort());
assertThrows(ProducerFencedException.class, () -> transactionManager.sendOffsetsToTransaction(
Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
}
@Test
public void testProducerFencedInAddPartitionToTxn() throws InterruptedException {
verifyProducerFencedForAddPartitionsToTxn(Errors.PRODUCER_FENCED);
}
@Test
public void testInvalidProducerEpochConvertToProducerFencedInAddPartitionToTxn() throws InterruptedException {
verifyProducerFencedForAddPartitionsToTxn(Errors.INVALID_PRODUCER_EPOCH);
}
private void verifyProducerFencedForAddPartitionsToTxn(Errors error) throws InterruptedException {
doInitTransactions();
transactionManager.beginTransaction();
@ -1549,9 +1587,37 @@ public class TransactionManagerTest {
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, producerId, epoch);
prepareAddPartitionsToTxnResponse(error, tp0, epoch, producerId);
verifyProducerFenced(responseFuture);
}
@Test
public void testProducerFencedInAddOffSetsToTxn() throws InterruptedException {
verifyProducerFencedForAddOffsetsToTxn(Errors.INVALID_PRODUCER_EPOCH);
}
@Test
public void testInvalidProducerEpochConvertToProducerFencedInAddOffSetsToTxn() throws InterruptedException {
verifyProducerFencedForAddOffsetsToTxn(Errors.INVALID_PRODUCER_EPOCH);
}
private void verifyProducerFencedForAddOffsetsToTxn(Errors error) throws InterruptedException {
doInitTransactions();
transactionManager.beginTransaction();
transactionManager.failIfNotReadyForSend();
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(consumerGroupId));
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddOffsetsToTxnResponse(error, consumerGroupId, producerId, epoch);
verifyProducerFenced(responseFuture);
}
private void verifyProducerFenced(Future<RecordMetadata> responseFuture) throws InterruptedException {
runUntil(responseFuture::isDone);
assertTrue(transactionManager.hasError());
@ -1571,6 +1637,68 @@ public class TransactionManagerTest {
Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
}
@Test
public void testInvalidProducerEpochConvertToProducerFencedInEndTxn() throws InterruptedException {
doInitTransactions();
transactionManager.beginTransaction();
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
TransactionalRequestResult commitResult = transactionManager.beginCommit();
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
prepareProduceResponse(Errors.NONE, producerId, epoch);
prepareEndTxnResponse(Errors.INVALID_PRODUCER_EPOCH, TransactionResult.COMMIT, producerId, epoch);
runUntil(commitResult::isCompleted);
runUntil(responseFuture::isDone);
// make sure the exception was thrown directly from the follow-up calls.
assertThrows(KafkaException.class, () -> transactionManager.beginTransaction());
assertThrows(KafkaException.class, () -> transactionManager.beginCommit());
assertThrows(KafkaException.class, () -> transactionManager.beginAbort());
assertThrows(KafkaException.class, () -> transactionManager.sendOffsetsToTransaction(
Collections.emptyMap(), new ConsumerGroupMetadata("dummyId")));
}
@Test
public void testInvalidProducerEpochFromProduce() throws InterruptedException {
doInitTransactions();
transactionManager.beginTransaction();
transactionManager.failIfNotReadyForSend();
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, producerId, epoch);
prepareProduceResponse(Errors.NONE, producerId, epoch);
sender.runOnce();
runUntil(responseFuture::isDone);
assertFalse(transactionManager.hasError());
transactionManager.beginCommit();
TransactionManager.TxnRequestHandler handler = transactionManager.nextRequest(false);
// First we will get an EndTxn for commit.
assertNotNull(handler);
assertTrue(handler.requestBuilder() instanceof EndTxnRequest.Builder);
handler = transactionManager.nextRequest(false);
// Second we will see an InitPid for handling InvalidProducerEpoch.
assertNotNull(handler);
assertTrue(handler.requestBuilder() instanceof InitProducerIdRequest.Builder);
}
@Test
public void testDisallowCommitOnProduceFailure() throws InterruptedException {
doInitTransactions();
@ -3086,10 +3214,7 @@ public class TransactionManagerTest {
}
private void verifyCommitOrAbortTransactionRetriable(TransactionResult firstTransactionResult,
TransactionResult retryTransactionResult)
throws InterruptedException {
final short epoch = 1;
TransactionResult retryTransactionResult) throws InterruptedException {
doInitTransactions();
transactionManager.beginTransaction();
@ -3376,7 +3501,7 @@ public class TransactionManagerTest {
}
}
private void assertFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
private void assertProduceFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
assertTrue(future.isDone());
try {

View File

@ -211,7 +211,7 @@ class TransactionCoordinator(brokerId: Int,
Left(Errors.CONCURRENT_TRANSACTIONS)
}
else if (!expectedProducerIdAndEpoch.forall(isValidProducerId)) {
Left(Errors.INVALID_PRODUCER_EPOCH)
Left(Errors.PRODUCER_FENCED)
} else {
// caller should have synchronized on txnMetadata already
txnMetadata.state match {
@ -278,7 +278,7 @@ class TransactionCoordinator(brokerId: Int,
if (txnMetadata.producerId != producerId) {
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
} else if (txnMetadata.producerEpoch != producerEpoch) {
Left(Errors.INVALID_PRODUCER_EPOCH)
Left(Errors.PRODUCER_FENCED)
} else if (txnMetadata.pendingTransitionInProgress) {
// return a retriable exception to let the client backoff and retry
Left(Errors.CONCURRENT_TRANSACTIONS)
@ -382,7 +382,7 @@ class TransactionCoordinator(brokerId: Int,
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
// Strict equality is enforced on the client side requests, as they shouldn't bump the producer epoch.
else if ((isFromClient && producerEpoch != txnMetadata.producerEpoch) || producerEpoch < txnMetadata.producerEpoch)
Left(Errors.INVALID_PRODUCER_EPOCH)
Left(Errors.PRODUCER_FENCED)
else if (txnMetadata.pendingTransitionInProgress && txnMetadata.pendingState.get != PrepareEpochFence)
Left(Errors.CONCURRENT_TRANSACTIONS)
else txnMetadata.state match {
@ -456,7 +456,7 @@ class TransactionCoordinator(brokerId: Int,
if (txnMetadata.producerId != producerId)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (txnMetadata.producerEpoch != producerEpoch)
Left(Errors.INVALID_PRODUCER_EPOCH)
Left(Errors.PRODUCER_FENCED)
else if (txnMetadata.pendingTransitionInProgress)
Left(Errors.CONCURRENT_TRANSACTIONS)
else txnMetadata.state match {
@ -539,7 +539,7 @@ class TransactionCoordinator(brokerId: Int,
s"${txnIdAndPidEpoch.transactionalId} due to timeout")
case error@(Errors.INVALID_PRODUCER_ID_MAPPING |
Errors.INVALID_PRODUCER_EPOCH |
Errors.PRODUCER_FENCED |
Errors.CONCURRENT_TRANSACTIONS) =>
debug(s"Rollback of ongoing transaction for transactionalId ${txnIdAndPidEpoch.transactionalId} " +
s"has been cancelled due to error $error")

View File

@ -249,10 +249,10 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
// the transaction log, so a retry that spans a coordinator change will fail. We expect this to be a rare case.
Right(producerEpoch, lastProducerEpoch)
else {
// Otherwise, the producer has a fenced epoch and should receive an INVALID_PRODUCER_EPOCH error
// Otherwise, the producer has a fenced epoch and should receive an PRODUCER_FENCED error
info(s"Expected producer epoch $expectedEpoch does not match current " +
s"producer epoch $producerEpoch or previous producer epoch $lastProducerEpoch")
Left(Errors.INVALID_PRODUCER_EPOCH)
Left(Errors.PRODUCER_FENCED)
}
}

View File

@ -1989,11 +1989,19 @@ class KafkaApis(val requestChannel: RequestChannel,
def sendResponseCallback(result: InitProducerIdResult): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val finalError =
if (initProducerIdRequest.version < 4 && result.error == Errors.PRODUCER_FENCED) {
// For older clients, they could not understand the new PRODUCER_FENCED error code,
// so we need to return the INVALID_PRODUCER_EPOCH to have the same client handling logic.
Errors.INVALID_PRODUCER_EPOCH
} else {
result.error
}
val responseData = new InitProducerIdResponseData()
.setProducerId(result.producerId)
.setProducerEpoch(result.producerEpoch)
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(result.error.code)
.setErrorCode(finalError.code)
val responseBody = new InitProducerIdResponse(responseData)
trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
responseBody
@ -2022,8 +2030,16 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) {
def sendResponseCallback(error: Errors): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val finalError =
if (endTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
// For older clients, they could not understand the new PRODUCER_FENCED error code,
// so we need to return the INVALID_PRODUCER_EPOCH to have the same client handling logic.
Errors.INVALID_PRODUCER_EPOCH
} else {
error
}
val responseBody = new EndTxnResponse(new EndTxnResponseData()
.setErrorCode(error.code())
.setErrorCode(finalError.code)
.setThrottleTimeMs(requestThrottleMs))
trace(s"Completed ${endTxnRequest.data.transactionalId}'s EndTxnRequest " +
s"with committed: ${endTxnRequest.data.committed}, " +
@ -2191,12 +2207,22 @@ class KafkaApis(val requestChannel: RequestChannel,
} else {
def sendResponseCallback(error: Errors): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val finalError =
if (addPartitionsToTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
// For older clients, they could not understand the new PRODUCER_FENCED error code,
// so we need to return the old INVALID_PRODUCER_EPOCH to have the same client handling logic.
Errors.INVALID_PRODUCER_EPOCH
} else {
error
}
val responseBody: AddPartitionsToTxnResponse = new AddPartitionsToTxnResponse(requestThrottleMs,
partitionsToAdd.map{tp => (tp, error)}.toMap.asJava)
partitionsToAdd.map{tp => (tp, finalError)}.toMap.asJava)
trace(s"Completed $transactionalId's AddPartitionsToTxnRequest with partitions $partitionsToAdd: errors: $error from client ${request.header.clientId}")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
@ -2230,9 +2256,18 @@ class KafkaApis(val requestChannel: RequestChannel,
else {
def sendResponseCallback(error: Errors): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val finalError =
if (addOffsetsToTxnRequest.version < 2 && error == Errors.PRODUCER_FENCED) {
// For older clients, they could not understand the new PRODUCER_FENCED error code,
// so we need to return the old INVALID_PRODUCER_EPOCH to have the same client handling logic.
Errors.INVALID_PRODUCER_EPOCH
} else {
error
}
val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(
new AddOffsetsToTxnResponseData()
.setErrorCode(error.code)
.setErrorCode(finalError.code)
.setThrottleTimeMs(requestThrottleMs))
trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId on partition " +
s"$offsetTopicPartition: errors: $error from client ${request.header.clientId}")

View File

@ -269,7 +269,7 @@ class TransactionCoordinatorTest {
}
@Test
def shouldRespondWithInvalidTnxProduceEpochOnAddPartitionsWhenEpochsAreDifferent(): Unit = {
def shouldRespondWithProducerFencedOnAddPartitionsWhenEpochsAreDifferent(): Unit = {
EasyMock.expect(transactionManager.getTransactionState(EasyMock.eq(transactionalId)))
.andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
new TransactionMetadata(transactionalId, 0, 0, 10, 9, 0, PrepareCommit, mutable.Set.empty, 0, 0)))))
@ -277,7 +277,7 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
coordinator.handleAddPartitionsToTransaction(transactionalId, 0L, 0, partitions, errorsCallback)
assertEquals(Errors.INVALID_PRODUCER_EPOCH, error)
assertEquals(Errors.PRODUCER_FENCED, error)
}
@Test
@ -367,7 +367,7 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
coordinator.handleEndTransaction(transactionalId, producerId, 0, TransactionResult.COMMIT, errorsCallback)
assertEquals(Errors.INVALID_PRODUCER_EPOCH, error)
assertEquals(Errors.PRODUCER_FENCED, error)
EasyMock.verify(transactionManager)
}
@ -518,7 +518,7 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager)
coordinator.handleEndTransaction(transactionalId, producerId, requestEpoch, TransactionResult.COMMIT, errorsCallback)
assertEquals(Errors.INVALID_PRODUCER_EPOCH, error)
assertEquals(Errors.PRODUCER_FENCED, error)
EasyMock.verify(transactionManager)
}
@ -609,7 +609,7 @@ class TransactionCoordinatorTest {
coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, None, initProducerIdMockCallback)
assertEquals(InitProducerIdResult(-1, -1, Errors.INVALID_PRODUCER_EPOCH), result)
assertEquals(InitProducerIdResult(-1, -1, Errors.PRODUCER_FENCED), result)
EasyMock.verify(transactionManager)
}
@ -734,7 +734,6 @@ class TransactionCoordinatorTest {
EasyMock.verify(transactionManager)
}
@Test
def testInitProducerIdWithNoLastProducerData(): Unit = {
// If the metadata doesn't include the previous producer data (for example, if it was written to the log by a broker
@ -751,7 +750,7 @@ class TransactionCoordinatorTest {
// Simulate producer trying to continue after new producer has already been initialized
coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, Some(new ProducerIdAndEpoch(producerId, producerEpoch)),
initProducerIdMockCallback)
assertEquals(InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Errors.INVALID_PRODUCER_EPOCH), result)
assertEquals(InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Errors.PRODUCER_FENCED), result)
}
@Test
@ -769,7 +768,7 @@ class TransactionCoordinatorTest {
// Simulate producer trying to continue after new producer has already been initialized
coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, Some(new ProducerIdAndEpoch(producerId, producerEpoch)),
initProducerIdMockCallback)
assertEquals(InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Errors.INVALID_PRODUCER_EPOCH), result)
assertEquals(InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Errors.PRODUCER_FENCED), result)
}
@Test
@ -843,7 +842,7 @@ class TransactionCoordinatorTest {
// Simulate old producer trying to continue from epoch 10
coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, Some(new ProducerIdAndEpoch(producerId, 10)),
initProducerIdMockCallback)
assertEquals(InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Errors.INVALID_PRODUCER_EPOCH), result)
assertEquals(InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Errors.PRODUCER_FENCED), result)
}
@Test
@ -929,7 +928,7 @@ class TransactionCoordinatorTest {
// Validate that producer with old producer ID and stale epoch is fenced
coordinator.handleInitProducerId(transactionalId, txnTimeoutMs, Some(new ProducerIdAndEpoch(producerId,
(Short.MaxValue - 2).toShort)), initProducerIdMockCallback)
assertEquals(InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Errors.INVALID_PRODUCER_EPOCH), result)
assertEquals(InitProducerIdResult(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, Errors.PRODUCER_FENCED), result)
}
@Test
@ -992,7 +991,7 @@ class TransactionCoordinatorTest {
EasyMock.replay(transactionManager, transactionMarkerChannelManager)
def checkOnEndTransactionComplete(txnIdAndPidEpoch: TransactionalIdAndProducerIdEpoch)(error: Errors): Unit = {
assertEquals(Errors.INVALID_PRODUCER_EPOCH, error)
assertEquals(Errors.PRODUCER_FENCED, error)
}
coordinator.abortTimedOutTransactions(checkOnEndTransactionComplete)

View File

@ -457,7 +457,7 @@ class TransactionMetadataTest {
val result = txnMetadata.prepareIncrementProducerEpoch(30000, Some((lastProducerEpoch - 1).toShort),
time.milliseconds())
assertEquals(Left(Errors.INVALID_PRODUCER_EPOCH), result)
assertEquals(Left(Errors.PRODUCER_FENCED), result)
}
private def testRotateProducerIdInOngoingState(state: TransactionState): Unit = {

View File

@ -34,7 +34,7 @@ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.SyncGroupCallback
import kafka.coordinator.group.JoinGroupResult
import kafka.coordinator.group.SyncGroupResult
import kafka.coordinator.group.{GroupCoordinator, GroupSummary, MemberSummary}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.AppendOrigin
import kafka.network.RequestChannel
import kafka.network.RequestChannel.SendResponse
@ -68,6 +68,7 @@ import org.apache.kafka.common.resource.PatternType
import org.apache.kafka.common.resource.ResourcePattern
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.ProducerIdAndEpoch
import org.apache.kafka.server.authorizer.Action
import org.apache.kafka.server.authorizer.AuthorizationResult
import org.apache.kafka.server.authorizer.Authorizer
@ -438,46 +439,283 @@ class KafkaApisTest {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 2)
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion) {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
val topicPartition = new TopicPartition(topic, 1)
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val responseCallback: Capture[Map[TopicPartition, Errors] => Unit] = EasyMock.newCapture()
val topicPartition = new TopicPartition(topic, 1)
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val responseCallback: Capture[Map[TopicPartition, Errors] => Unit] = EasyMock.newCapture()
val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
val groupId = "groupId"
val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty())
val groupId = "groupId"
val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
"txnId",
groupId,
15L,
0.toShort,
Map(topicPartition -> partitionOffsetCommitData).asJava,
false
).build(1)
val request = buildRequest(offsetCommitRequest)
val producerId = 15L
val epoch = 0.toShort
EasyMock.expect(groupCoordinator.handleTxnCommitOffsets(
EasyMock.eq(groupId),
EasyMock.eq(15L),
EasyMock.eq(0),
EasyMock.anyString(),
EasyMock.eq(Option.empty),
EasyMock.anyInt(),
EasyMock.anyObject(),
EasyMock.capture(responseCallback)
)).andAnswer(
() => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS)))
val offsetCommitRequest = new TxnOffsetCommitRequest.Builder(
"txnId",
groupId,
producerId,
epoch,
Map(topicPartition -> partitionOffsetCommitData).asJava,
false
).build(version.toShort)
val request = buildRequest(offsetCommitRequest)
EasyMock.expect(groupCoordinator.handleTxnCommitOffsets(
EasyMock.eq(groupId),
EasyMock.eq(producerId),
EasyMock.eq(epoch),
EasyMock.anyString(),
EasyMock.eq(Option.empty),
EasyMock.anyInt(),
EasyMock.anyObject(),
EasyMock.capture(responseCallback)
)).andAnswer(
() => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS)))
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator)
createKafkaApis().handleTxnOffsetCommitRequest(request)
createKafkaApis().handleTxnOffsetCommitRequest(request)
val response = readResponse(ApiKeys.TXN_OFFSET_COMMIT, offsetCommitRequest, capturedResponse)
.asInstanceOf[TxnOffsetCommitResponse]
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition))
val response = readResponse(ApiKeys.TXN_OFFSET_COMMIT, offsetCommitRequest, capturedResponse)
.asInstanceOf[TxnOffsetCommitResponse]
if (version < 2) {
assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition))
} else {
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, response.errors().get(topicPartition))
}
}
}
@Test
def shouldReplaceProducerFencedWithInvalidProducerEpochInInitProducerIdWithOlderClient(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 2)
for (version <- ApiKeys.INIT_PRODUCER_ID.oldestVersion to ApiKeys.INIT_PRODUCER_ID.latestVersion) {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val responseCallback: Capture[InitProducerIdResult => Unit] = EasyMock.newCapture()
val transactionalId = "txnId"
val producerId = if (version < 3)
RecordBatch.NO_PRODUCER_ID
else
15
val epoch = if (version < 3)
RecordBatch.NO_PRODUCER_EPOCH
else
0.toShort
val txnTimeoutMs = TimeUnit.MINUTES.toMillis(15).toInt
val initProducerIdRequest = new InitProducerIdRequest.Builder(
new InitProducerIdRequestData()
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(txnTimeoutMs)
.setProducerId(producerId)
.setProducerEpoch(epoch)
).build(version.toShort)
val request = buildRequest(initProducerIdRequest)
val expectedProducerIdAndEpoch = if (version < 3)
Option.empty
else
Option(new ProducerIdAndEpoch(producerId, epoch))
EasyMock.expect(txnCoordinator.handleInitProducerId(
EasyMock.eq(transactionalId),
EasyMock.eq(txnTimeoutMs),
EasyMock.eq(expectedProducerIdAndEpoch),
EasyMock.capture(responseCallback)
)).andAnswer(
() => responseCallback.getValue.apply(InitProducerIdResult(producerId, epoch, Errors.PRODUCER_FENCED)))
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
createKafkaApis().handleInitProducerIdRequest(request)
val response = readResponse(ApiKeys.INIT_PRODUCER_ID, initProducerIdRequest, capturedResponse)
.asInstanceOf[InitProducerIdResponse]
if (version < 4) {
assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
} else {
assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
}
}
}
@Test
def shouldReplaceProducerFencedWithInvalidProducerEpochInAddOffsetToTxnWithOlderClient(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 2)
for (version <- ApiKeys.ADD_OFFSETS_TO_TXN.oldestVersion to ApiKeys.ADD_OFFSETS_TO_TXN.latestVersion) {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator, txnCoordinator)
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val responseCallback: Capture[Errors => Unit] = EasyMock.newCapture()
val groupId = "groupId"
val transactionalId = "txnId"
val producerId = 15L
val epoch = 0.toShort
val addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(
new AddOffsetsToTxnRequestData()
.setGroupId(groupId)
.setTransactionalId(transactionalId)
.setProducerId(producerId)
.setProducerEpoch(epoch)
).build(version.toShort)
val request = buildRequest(addOffsetsToTxnRequest)
val partition = 1
EasyMock.expect(groupCoordinator.partitionFor(
EasyMock.eq(groupId)
)).andReturn(partition)
EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
EasyMock.eq(transactionalId),
EasyMock.eq(producerId),
EasyMock.eq(epoch),
EasyMock.eq(Set(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition))),
EasyMock.capture(responseCallback)
)).andAnswer(
() => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator, groupCoordinator)
createKafkaApis().handleAddOffsetsToTxnRequest(request)
val response = readResponse(ApiKeys.ADD_OFFSETS_TO_TXN, addOffsetsToTxnRequest, capturedResponse)
.asInstanceOf[AddOffsetsToTxnResponse]
if (version < 2) {
assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
} else {
assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
}
}
}
@Test
def shouldReplaceProducerFencedWithInvalidProducerEpochInAddPartitionToTxnWithOlderClient(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 2)
for (version <- ApiKeys.ADD_PARTITIONS_TO_TXN.oldestVersion to ApiKeys.ADD_PARTITIONS_TO_TXN.latestVersion) {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val responseCallback: Capture[Errors => Unit] = EasyMock.newCapture()
val transactionalId = "txnId"
val producerId = 15L
val epoch = 0.toShort
val partition = 1
val topicPartition = new TopicPartition(topic, partition)
val addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(
transactionalId,
producerId,
epoch,
Collections.singletonList(topicPartition)
).build(version.toShort)
val request = buildRequest(addPartitionsToTxnRequest)
EasyMock.expect(txnCoordinator.handleAddPartitionsToTransaction(
EasyMock.eq(transactionalId),
EasyMock.eq(producerId),
EasyMock.eq(epoch),
EasyMock.eq(Set(topicPartition)),
EasyMock.capture(responseCallback)
)).andAnswer(
() => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
createKafkaApis().handleAddPartitionToTxnRequest(request)
val response = readResponse(ApiKeys.ADD_PARTITIONS_TO_TXN, addPartitionsToTxnRequest, capturedResponse)
.asInstanceOf[AddPartitionsToTxnResponse]
if (version < 2) {
assertEquals(Collections.singletonMap(topicPartition, Errors.INVALID_PRODUCER_EPOCH), response.errors())
} else {
assertEquals(Collections.singletonMap(topicPartition, Errors.PRODUCER_FENCED), response.errors())
}
}
}
@Test
def shouldReplaceProducerFencedWithInvalidProducerEpochInEndTxnWithOlderClient(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 2)
for (version <- ApiKeys.END_TXN.oldestVersion to ApiKeys.END_TXN.latestVersion) {
EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
val responseCallback: Capture[Errors => Unit] = EasyMock.newCapture()
val transactionalId = "txnId"
val producerId = 15L
val epoch = 0.toShort
val endTxnRequest = new EndTxnRequest.Builder(
new EndTxnRequestData()
.setTransactionalId(transactionalId)
.setProducerId(producerId)
.setProducerEpoch(epoch)
.setCommitted(true)
).build(version.toShort)
val request = buildRequest(endTxnRequest)
EasyMock.expect(txnCoordinator.handleEndTransaction(
EasyMock.eq(transactionalId),
EasyMock.eq(producerId),
EasyMock.eq(epoch),
EasyMock.eq(TransactionResult.COMMIT),
EasyMock.capture(responseCallback)
)).andAnswer(
() => responseCallback.getValue.apply(Errors.PRODUCER_FENCED))
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
createKafkaApis().handleEndTxnRequest(request)
val response = readResponse(ApiKeys.END_TXN, endTxnRequest, capturedResponse)
.asInstanceOf[EndTxnResponse]
if (version < 2) {
assertEquals(Errors.INVALID_PRODUCER_EPOCH.code, response.data.errorCode)
} else {
assertEquals(Errors.PRODUCER_FENCED.code, response.data.errorCode)
}
}
}
@Test
@ -609,25 +847,25 @@ class KafkaApisTest {
@Test
def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndLeaderEpoch(): Unit = {
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
LeaderAndIsr.initialLeaderEpoch + 2, true)
LeaderAndIsr.initialLeaderEpoch + 2, deletePartition = true)
}
@Test
def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndDeleteSentinel(): Unit = {
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
LeaderAndIsr.EpochDuringDelete, true)
LeaderAndIsr.EpochDuringDelete, deletePartition = true)
}
@Test
def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlagAndNoEpochSentinel(): Unit = {
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
LeaderAndIsr.NoEpoch, true)
LeaderAndIsr.NoEpoch, deletePartition = true)
}
@Test
def shouldNotResignCoordinatorsIfStopReplicaReceivedWithoutDeleteFlag(): Unit = {
shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(
LeaderAndIsr.initialLeaderEpoch + 2, false)
LeaderAndIsr.initialLeaderEpoch + 2, deletePartition = false)
}
def shouldResignCoordinatorsIfStopReplicaReceivedWithDeleteFlag(leaderEpoch: Int,