MINOR: Push down logic from TransactionManager to TxnPartitionEntry (#14591)

And encapsulate TxnPartitionEntry state.

This makes it easier to understand the behavior and the paths through
which the state is updated.

Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
Ismael Juma 2023-10-28 07:27:20 -07:00 committed by GitHub
parent abde0e0878
commit fa36a7f2d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 305 additions and 224 deletions

View File

@ -473,11 +473,11 @@ public final class ProducerBatch {
recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
}
public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) {
public void resetProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) {
log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}",
this.baseSequence(), this.topicPartition, baseSequence);
reopened = true;
recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional);
recordsBuilder.reopenAndRewriteProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence, isTransactional());
}
/**

View File

@ -818,7 +818,7 @@ public class RecordAccumulator {
}
private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPartition tp) {
ProducerIdAndEpoch producerIdAndEpoch = null;
ProducerIdAndEpoch producerIdAndEpoch;
if (transactionManager != null) {
if (!transactionManager.isSendToPartitionAllowed(tp))
return true;

View File

@ -48,7 +48,6 @@ import org.apache.kafka.common.message.FindCoordinatorResponseData.Coordinator;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
@ -84,7 +83,6 @@ import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.function.Supplier;
/**
@ -92,7 +90,6 @@ import java.util.function.Supplier;
*/
public class TransactionManager {
private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
private final Logger log;
private final String transactionalId;
@ -271,7 +268,7 @@ public class TransactionManager {
this.partitionsWithUnresolvedSequences = new HashMap<>();
this.partitionsToRewriteSequences = new HashSet<>();
this.retryBackoffMs = retryBackoffMs;
this.txnPartitionMap = new TxnPartitionMap();
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
}
@ -521,7 +518,7 @@ public class TransactionManager {
}
private void resetSequenceForPartition(TopicPartition topicPartition) {
txnPartitionMap.topicPartitions.remove(topicPartition);
txnPartitionMap.remove(topicPartition);
this.partitionsWithUnresolvedSequences.remove(topicPartition);
}
@ -572,28 +569,25 @@ public class TransactionManager {
/**
* Returns the next sequence number to be written to the given TopicPartition.
*/
synchronized Integer sequenceNumber(TopicPartition topicPartition) {
return txnPartitionMap.getOrCreate(topicPartition).nextSequence;
synchronized int sequenceNumber(TopicPartition topicPartition) {
return txnPartitionMap.getOrCreate(topicPartition).nextSequence();
}
/**
* Returns the current producer id/epoch of the given TopicPartition.
*/
synchronized ProducerIdAndEpoch producerIdAndEpoch(TopicPartition topicPartition) {
return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch;
return txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch();
}
synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
Integer currentSequence = sequenceNumber(topicPartition);
currentSequence = DefaultRecordBatch.incrementSequence(currentSequence, increment);
txnPartitionMap.get(topicPartition).nextSequence = currentSequence;
txnPartitionMap.get(topicPartition).incrementSequence(increment);
}
synchronized void addInFlightBatch(ProducerBatch batch) {
if (!batch.hasSequence())
throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set.");
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.add(batch);
txnPartitionMap.get(batch.topicPartition).addInflightBatch(batch);
}
/**
@ -606,33 +600,21 @@ public class TransactionManager {
synchronized int firstInFlightSequence(TopicPartition topicPartition) {
if (!hasInflightBatches(topicPartition))
return RecordBatch.NO_SEQUENCE;
SortedSet<ProducerBatch> inflightBatches = txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
if (inflightBatches.isEmpty())
return RecordBatch.NO_SEQUENCE;
else
return inflightBatches.first().baseSequence();
ProducerBatch batch = nextBatchBySequence(topicPartition);
return batch == null ? RecordBatch.NO_SEQUENCE : batch.baseSequence();
}
synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
SortedSet<ProducerBatch> queue = txnPartitionMap.get(topicPartition).inflightBatchesBySequence;
return queue.isEmpty() ? null : queue.first();
return txnPartitionMap.nextBatchBySequence(topicPartition);
}
synchronized void removeInFlightBatch(ProducerBatch batch) {
if (hasInflightBatches(batch.topicPartition)) {
txnPartitionMap.get(batch.topicPartition).inflightBatchesBySequence.remove(batch);
}
if (hasInflightBatches(batch.topicPartition))
txnPartitionMap.removeInFlightBatch(batch);
}
private int maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) {
int lastAckedSequence = lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER);
if (sequence > lastAckedSequence) {
txnPartitionMap.get(topicPartition).lastAckedSequence = sequence;
return sequence;
}
return lastAckedSequence;
return txnPartitionMap.maybeUpdateLastAckedSequence(topicPartition, sequence);
}
synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) {
@ -647,18 +629,7 @@ public class TransactionManager {
if (response.baseOffset == ProduceResponse.INVALID_OFFSET)
return;
long lastOffset = response.baseOffset + batch.recordCount - 1;
OptionalLong lastAckedOffset = lastAckedOffset(batch.topicPartition);
// It might happen that the TransactionManager has been reset while a request was reenqueued and got a valid
// response for this. This can happen only if the producer is only idempotent (not transactional) and in
// this case there will be no tracked bookkeeper entry about it, so we have to insert one.
if (!lastAckedOffset.isPresent() && !isTransactional()) {
txnPartitionMap.getOrCreate(batch.topicPartition);
}
if (lastOffset > lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET)) {
txnPartitionMap.get(batch.topicPartition).lastAckedOffset = lastOffset;
} else {
log.trace("Partition {} keeps lastOffset at {}", batch.topicPartition, lastOffset);
}
txnPartitionMap.updateLastAckedOffset(batch.topicPartition, isTransactional(), lastOffset);
}
public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
@ -724,50 +695,18 @@ public class TransactionManager {
if (!isTransactional()) {
requestEpochBumpForPartition(batch.topicPartition);
} else {
adjustSequencesDueToFailedBatch(batch);
txnPartitionMap.adjustSequencesDueToFailedBatch(batch);
}
}
}
}
// If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted
// so that they don't fail with the OutOfOrderSequenceException.
//
// This method must only be called when we know that the batch is question has been unequivocally failed by the broker,
// ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar.
private void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
if (!txnPartitionMap.contains(batch.topicPartition))
// Sequence numbers are not being tracked for this partition. This could happen if the producer id was just
// reset due to a previous OutOfOrderSequenceException.
return;
log.debug("producerId: {}, send to partition {} failed fatally. Reducing future sequence numbers by {}",
batch.producerId(), batch.topicPartition, batch.recordCount);
int currentSequence = sequenceNumber(batch.topicPartition);
currentSequence -= batch.recordCount;
if (currentSequence < 0)
throw new IllegalStateException("Sequence number for partition " + batch.topicPartition + " is going to become negative: " + currentSequence);
setNextSequence(batch.topicPartition, currentSequence);
txnPartitionMap.get(batch.topicPartition).resetSequenceNumbers(inFlightBatch -> {
if (inFlightBatch.baseSequence() < batch.baseSequence())
return;
int newSequence = inFlightBatch.baseSequence() - batch.recordCount;
if (newSequence < 0)
throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence()
+ " for partition " + batch.topicPartition + " is going to become negative: " + newSequence);
inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence, inFlightBatch.isTransactional());
});
}
synchronized boolean hasInflightBatches(TopicPartition topicPartition) {
return !txnPartitionMap.getOrCreate(topicPartition).inflightBatchesBySequence.isEmpty();
return txnPartitionMap.getOrCreate(topicPartition).hasInflightBatches();
}
synchronized boolean hasStaleProducerIdAndEpoch(TopicPartition topicPartition) {
return !producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch);
return !producerIdAndEpoch.equals(txnPartitionMap.getOrCreate(topicPartition).producerIdAndEpoch());
}
synchronized boolean hasUnresolvedSequences() {
@ -817,7 +756,7 @@ public class TransactionManager {
// For the idempotent producer, bump the epoch
log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
"Going to bump epoch and reset sequence numbers.", topicPartition,
lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition));
lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER), sequenceNumber(topicPartition));
requestEpochBumpForPartition(topicPartition);
}
@ -828,11 +767,7 @@ public class TransactionManager {
}
private boolean isNextSequence(TopicPartition topicPartition, int sequence) {
return sequence - lastAckedSequence(topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) == 1;
}
private void setNextSequence(TopicPartition topicPartition, int sequence) {
txnPartitionMap.get(topicPartition).nextSequence = sequence;
return sequence - lastAckedSequence(topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) == 1;
}
private boolean isNextSequenceForUnresolvedPartition(TopicPartition topicPartition, int sequence) {
@ -983,7 +918,7 @@ public class TransactionManager {
// come back from the broker, they would also come with an UNKNOWN_PRODUCER_ID error. In this case, we should not
// reset the sequence numbers to the beginning.
return true;
} else if (lastAckedOffset(batch.topicPartition).orElse(NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
} else if (lastAckedOffset(batch.topicPartition).orElse(TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) < response.logStartOffset) {
// The head of the log has been removed, probably due to the retention time elapsing. In this case,
// we expect to lose the producer state. For the transactional producer, reset the sequences of all
// inflight batches to be from the beginning and retry them, so that the transaction does not need to

View File

@ -18,33 +18,41 @@
package org.apache.kafka.clients.producer.internals;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.PrimitiveRef;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
class TxnPartitionEntry {
static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
private final TopicPartition topicPartition;
// The producer id/epoch being used for a given partition.
ProducerIdAndEpoch producerIdAndEpoch;
private ProducerIdAndEpoch producerIdAndEpoch;
// The base sequence of the next batch bound for a given partition.
int nextSequence;
private int nextSequence;
// The sequence number of the last record of the last ack'd batch from the given partition. When there are no
// in flight requests for a partition, the lastAckedSequence(topicPartition) == nextSequence(topicPartition) - 1.
int lastAckedSequence;
private int lastAckedSequence;
// Keep track of the in flight batches bound for a partition, ordered by sequence. This helps us to ensure that
// we continue to order batches by the sequence numbers even when the responses come back out of order during
// leader failover. We add a batch to the queue when it is drained, and remove it when the batch completes
// (either successfully or through a fatal failure).
SortedSet<ProducerBatch> inflightBatchesBySequence;
private SortedSet<ProducerBatch> inflightBatchesBySequence;
// We keep track of the last acknowledged offset on a per partition basis in order to disambiguate UnknownProducer
// responses which are due to the retention period elapsing, and those which are due to actual lost data.
long lastAckedOffset;
private long lastAckedOffset;
// `inflightBatchesBySequence` should only have batches with the same producer id and producer
// epoch, but there is an edge case where we may remove the wrong batch if the comparator
@ -55,15 +63,94 @@ class TxnPartitionEntry {
.thenComparingInt(ProducerBatch::producerEpoch)
.thenComparingInt(ProducerBatch::baseSequence);
TxnPartitionEntry() {
TxnPartitionEntry(TopicPartition topicPartition) {
this.topicPartition = topicPartition;
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.nextSequence = 0;
this.lastAckedSequence = TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
this.inflightBatchesBySequence = new TreeSet<>(PRODUCER_BATCH_COMPARATOR);
}
void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
ProducerIdAndEpoch producerIdAndEpoch() {
return producerIdAndEpoch;
}
int nextSequence() {
return nextSequence;
}
OptionalLong lastAckedOffset() {
if (lastAckedOffset != ProduceResponse.INVALID_OFFSET)
return OptionalLong.of(lastAckedOffset);
return OptionalLong.empty();
}
OptionalInt lastAckedSequence() {
if (lastAckedSequence != TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER)
return OptionalInt.of(lastAckedSequence);
return OptionalInt.empty();
}
boolean hasInflightBatches() {
return !inflightBatchesBySequence.isEmpty();
}
ProducerBatch nextBatchBySequence() {
return inflightBatchesBySequence.isEmpty() ? null : inflightBatchesBySequence.first();
}
void incrementSequence(int increment) {
this.nextSequence = DefaultRecordBatch.incrementSequence(this.nextSequence, increment);
}
void addInflightBatch(ProducerBatch batch) {
inflightBatchesBySequence.add(batch);
}
void setLastAckedOffset(long lastAckedOffset) {
this.lastAckedOffset = lastAckedOffset;
}
void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) {
final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
resetSequenceNumbers(inFlightBatch -> {
inFlightBatch.resetProducerState(newProducerIdAndEpoch, sequence.value);
sequence.value += inFlightBatch.recordCount;
});
producerIdAndEpoch = newProducerIdAndEpoch;
nextSequence = sequence.value;
lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
}
int maybeUpdateLastAckedSequence(int sequence) {
if (sequence > lastAckedSequence) {
lastAckedSequence = sequence;
return sequence;
}
return lastAckedSequence;
}
void removeInFlightBatch(ProducerBatch batch) {
inflightBatchesBySequence.remove(batch);
}
void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) {
decrementSequence(recordCount);
resetSequenceNumbers(inFlightBatch -> {
if (inFlightBatch.baseSequence() < baseSequence)
return;
int newSequence = inFlightBatch.baseSequence() - recordCount;
if (newSequence < 0)
throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence()
+ " for partition " + topicPartition + " is going to become negative: " + newSequence);
inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence);
});
}
private void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
TreeSet<ProducerBatch> newInflights = new TreeSet<>(PRODUCER_BATCH_COMPARATOR);
for (ProducerBatch inflightBatch : inflightBatchesBySequence) {
resetSequence.accept(inflightBatch);
@ -71,4 +158,16 @@ class TxnPartitionEntry {
}
inflightBatchesBySequence = newInflights;
}
private boolean decrementSequence(int decrement) {
int updatedSequence = nextSequence;
updatedSequence -= decrement;
if (updatedSequence < 0) {
throw new IllegalStateException(
"Sequence number for partition " + topicPartition + " is going to become negative: "
+ updatedSequence);
}
this.nextSequence = updatedSequence;
return true;
}
}

View File

@ -21,26 +21,34 @@ import java.util.HashMap;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.PrimitiveRef;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.slf4j.Logger;
class TxnPartitionMap {
final Map<TopicPartition, TxnPartitionEntry> topicPartitions = new HashMap<>();
private final Logger log;
private final Map<TopicPartition, TxnPartitionEntry> topicPartitions = new HashMap<>();
TxnPartitionMap(LogContext logContext) {
this.log = logContext.logger(TxnPartitionMap.class);
}
TxnPartitionEntry get(TopicPartition topicPartition) {
TxnPartitionEntry ent = topicPartitions.get(topicPartition);
if (ent == null) {
throw new IllegalStateException("Trying to get the sequence number for " + topicPartition +
", but the sequence number was never set for this partition.");
throw new IllegalStateException("Trying to get txnPartitionEntry for " + topicPartition +
", but it was never set for this partition.");
}
return ent;
}
TxnPartitionEntry getOrCreate(TopicPartition topicPartition) {
return topicPartitions.computeIfAbsent(topicPartition, tp -> new TxnPartitionEntry());
return topicPartitions.computeIfAbsent(topicPartition, tp -> new TxnPartitionEntry(tp));
}
boolean contains(TopicPartition topicPartition) {
@ -53,31 +61,70 @@ class TxnPartitionMap {
OptionalLong lastAckedOffset(TopicPartition topicPartition) {
TxnPartitionEntry entry = topicPartitions.get(topicPartition);
if (entry != null && entry.lastAckedOffset != ProduceResponse.INVALID_OFFSET) {
return OptionalLong.of(entry.lastAckedOffset);
} else {
return OptionalLong.empty();
}
if (entry != null)
return entry.lastAckedOffset();
return OptionalLong.empty();
}
OptionalInt lastAckedSequence(TopicPartition topicPartition) {
TxnPartitionEntry entry = topicPartitions.get(topicPartition);
if (entry != null && entry.lastAckedSequence != TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER) {
return OptionalInt.of(entry.lastAckedSequence);
} else {
return OptionalInt.empty();
}
if (entry != null)
return entry.lastAckedSequence();
return OptionalInt.empty();
}
void startSequencesAtBeginning(TopicPartition topicPartition, ProducerIdAndEpoch newProducerIdAndEpoch) {
final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
TxnPartitionEntry topicPartitionEntry = get(topicPartition);
topicPartitionEntry.resetSequenceNumbers(inFlightBatch -> {
inFlightBatch.resetProducerState(newProducerIdAndEpoch, sequence.value, inFlightBatch.isTransactional());
sequence.value += inFlightBatch.recordCount;
});
topicPartitionEntry.producerIdAndEpoch = newProducerIdAndEpoch;
topicPartitionEntry.nextSequence = sequence.value;
topicPartitionEntry.lastAckedSequence = TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
TxnPartitionEntry entry = get(topicPartition);
if (entry != null)
entry.startSequencesAtBeginning(newProducerIdAndEpoch);
}
void remove(TopicPartition topicPartition) {
topicPartitions.remove(topicPartition);
}
void updateLastAckedOffset(TopicPartition topicPartition, boolean isTransactional, long lastOffset) {
OptionalLong lastAckedOffset = lastAckedOffset(topicPartition);
// It might happen that the TransactionManager has been reset while a request was reenqueued and got a valid
// response for this. This can happen only if the producer is only idempotent (not transactional) and in
// this case there will be no tracked bookkeeper entry about it, so we have to insert one.
if (!lastAckedOffset.isPresent() && !isTransactional)
getOrCreate(topicPartition);
if (lastOffset > lastAckedOffset.orElse(ProduceResponse.INVALID_OFFSET))
get(topicPartition).setLastAckedOffset(lastOffset);
else
log.trace("Partition {} keeps lastOffset at {}", topicPartition, lastOffset);
}
// If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted
// so that they don't fail with the OutOfOrderSequenceException.
//
// This method must only be called when we know that the batch is question has been unequivocally failed by the broker,
// ie. it has received a confirmed fatal status code like 'Message Too Large' or something similar.
void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
if (!contains(batch.topicPartition))
// Sequence numbers are not being tracked for this partition. This could happen if the producer id was just
// reset due to a previous OutOfOrderSequenceException.
return;
log.debug("producerId: {}, send to partition {} failed fatally. Reducing future sequence numbers by {}",
batch.producerId(), batch.topicPartition, batch.recordCount);
get(batch.topicPartition).adjustSequencesDueToFailedBatch(batch.baseSequence(), batch.recordCount);
}
int maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) {
TxnPartitionEntry entry = topicPartitions.get(topicPartition);
if (entry != null)
return entry.maybeUpdateLastAckedSequence(sequence);
return TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER;
}
ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
return get(topicPartition).nextBatchBySequence();
}
void removeInFlightBatch(ProducerBatch batch) {
get(batch.topicPartition).removeInFlightBatch(batch);
}
}

View File

@ -35,7 +35,7 @@ public class ProducerIdAndEpoch {
@Override
public String toString() {
return "ProducerIdAndEpoch(producerId=" + producerId + ", epoch=" + epoch + ")";
return "(producerId=" + producerId + ", epoch=" + epoch + ")";
}
@Override

View File

@ -778,7 +778,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -786,14 +786,14 @@ public class SenderTest {
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
@ -828,7 +828,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -836,7 +836,7 @@ public class SenderTest {
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
@ -848,7 +848,7 @@ public class SenderTest {
sender.runOnce();
assertEquals(3, client.inFlightRequestCount());
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
@ -875,7 +875,7 @@ public class SenderTest {
sender.runOnce(); // Do nothing, we are reduced to one in flight request during retries.
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue()); // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented.
assertEquals(3, transactionManager.sequenceNumber(tp0)); // the batch for request 4 shouldn't have been drained, and hence the sequence should not have been incremented.
assertEquals(1, client.inFlightRequestCount());
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
@ -928,7 +928,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -936,14 +936,14 @@ public class SenderTest {
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
@ -987,7 +987,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest with multiple messages.
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -998,7 +998,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
// make sure the next sequence number accounts for multi-message batches.
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 0);
@ -1008,7 +1008,7 @@ public class SenderTest {
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertTrue(request1.isDone());
assertEquals(0, request1.get().offset());
@ -1023,7 +1023,7 @@ public class SenderTest {
// epoch should be bumped and sequence numbers reset
assertEquals(1, transactionManager.producerIdAndEpoch().epoch);
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(0, transactionManager.firstInFlightSequence(tp0));
}
@ -1249,7 +1249,7 @@ public class SenderTest {
) {
assertEquals(expectedProducerId, transactionManager.producerIdAndEpoch(tp).producerId, "Producer Id:");
assertEquals(expectedProducerEpoch, transactionManager.producerIdAndEpoch(tp).epoch, "Producer Epoch:");
assertEquals(expectedSequenceValue, transactionManager.sequenceNumber(tp).longValue(), "Seq Number:");
assertEquals(expectedSequenceValue, transactionManager.sequenceNumber(tp), "Seq Number:");
assertEquals(expectedLastAckedSequence, transactionManager.lastAckedSequence(tp), "Last Acked Seq Number:");
}
@ -1260,7 +1260,7 @@ public class SenderTest {
setupWithTransactionState(transactionManager);
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -1268,14 +1268,14 @@ public class SenderTest {
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
@ -1342,7 +1342,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -1410,7 +1410,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", "value");
@ -1432,7 +1432,7 @@ public class SenderTest {
setupWithTransactionState(transactionManager, false, null);
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -1480,7 +1480,7 @@ public class SenderTest {
assertEquals(1, batches.size());
assertFalse(batches.peekFirst().hasSequence());
assertFalse(client.hasInFlightRequests());
assertEquals(2L, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2L, transactionManager.sequenceNumber(tp0));
assertTrue(transactionManager.hasUnresolvedSequence(tp0));
sender.runOnce(); // clear the unresolved state, send the pending request.
@ -1499,7 +1499,7 @@ public class SenderTest {
setupWithTransactionState(transactionManager);
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -1538,7 +1538,7 @@ public class SenderTest {
sender.runOnce();
assertEquals((short) 1, transactionManager.producerIdAndEpoch().epoch);
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertFalse(transactionManager.hasUnresolvedSequence(tp0));
}
@ -1591,7 +1591,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", "value");
@ -1599,7 +1599,7 @@ public class SenderTest {
sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1);
sender.runOnce(); // receive response
assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1L, transactionManager.sequenceNumber(tp0));
Node node = metadata.fetch().nodes().get(0);
time.sleep(15000L);
@ -1660,7 +1660,7 @@ public class SenderTest {
assertEquals(10, successfulResponse.get().offset());
// The epoch and the sequence are updated when the next batch is sent.
assertEquals(1, transactionManager.sequenceNumber(tp1).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp1));
}
@Test
@ -1772,7 +1772,7 @@ public class SenderTest {
sender.runOnce();
assertTrue(successfulResponse.isDone());
// epoch of partition is bumped and sequence is reset when the next batch is sent
assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp1));
}
@Test
@ -1783,7 +1783,7 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
@ -1791,14 +1791,14 @@ public class SenderTest {
String nodeId = client.requests().peek().destination();
Node node = new Node(Integer.parseInt(nodeId), "localhost", 0);
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
// Send second ProduceRequest
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, client.inFlightRequestCount());
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertFalse(request1.isDone());
assertFalse(request2.isDone());
@ -1842,14 +1842,14 @@ public class SenderTest {
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce(); // Receive AddPartitions response
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -1865,7 +1865,7 @@ public class SenderTest {
appendToAccumulator(tp0);
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
@ -1875,7 +1875,7 @@ public class SenderTest {
// We should have reset the sequence number state of the partition because the state was lost on the broker.
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertFalse(request2.isDone());
assertFalse(client.hasInFlightRequests());
@ -1885,7 +1885,7 @@ public class SenderTest {
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
sender.runOnce(); // receive response 1
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertFalse(client.hasInFlightRequests());
assertTrue(request2.isDone());
assertEquals(1012L, request2.get().offset());
@ -1900,14 +1900,14 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -1923,7 +1923,7 @@ public class SenderTest {
appendToAccumulator(tp0);
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
@ -1934,7 +1934,7 @@ public class SenderTest {
// We should have reset the sequence number state of the partition because the state was lost on the broker.
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertFalse(request2.isDone());
assertTrue(client.hasInFlightRequests());
assertEquals((short) 1, transactionManager.producerIdAndEpoch().epoch);
@ -1943,7 +1943,7 @@ public class SenderTest {
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
sender.runOnce(); // receive response 1
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertFalse(client.hasInFlightRequests());
assertTrue(request2.isDone());
assertEquals(1012L, request2.get().offset());
@ -1958,14 +1958,14 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -1980,7 +1980,7 @@ public class SenderTest {
// Send second ProduceRequest
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
@ -1990,7 +1990,7 @@ public class SenderTest {
// We should have reset the sequence number state of the partition because the state was lost on the broker.
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertFalse(request2.isDone());
assertFalse(client.hasInFlightRequests());
@ -2001,7 +2001,7 @@ public class SenderTest {
sendIdempotentProducerResponse(1, tp0, Errors.NONE, 1011L, 1010L);
sender.runOnce(); // receive response 1
assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertFalse(client.hasInFlightRequests());
assertTrue(request2.isDone());
assertEquals(1011L, request2.get().offset());
@ -2016,14 +2016,14 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -2038,14 +2038,14 @@ public class SenderTest {
// Send second ProduceRequest
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
// Send the third ProduceRequest, in parallel with the second. It should be retried even though the
// lastAckedOffset > logStartOffset when its UnknownProducerResponse comes back.
Future<RecordMetadata> request3 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(3, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
@ -2058,7 +2058,7 @@ public class SenderTest {
// We should have reset the sequence number state of the partition because the state was lost on the broker.
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertFalse(request2.isDone());
assertFalse(request3.isDone());
assertEquals(2, client.inFlightRequestCount());
@ -2070,7 +2070,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1011L, 1010L);
sender.runOnce(); // receive response 2, don't send request 3 since we can have at most 1 in flight when retrying
@ -2102,14 +2102,14 @@ public class SenderTest {
prepareAndReceiveInitProducerId(producerId, Errors.NONE);
assertTrue(transactionManager.hasProducerId());
assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
sendIdempotentProducerResponse(0, tp0, Errors.NONE, 1000L, 10L);
@ -2124,7 +2124,7 @@ public class SenderTest {
// Send second ProduceRequest,
Future<RecordMetadata> request2 = appendToAccumulator(tp0);
sender.runOnce();
assertEquals(2, transactionManager.sequenceNumber(tp0).longValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(tp0));
assertFalse(request2.isDone());
@ -2432,7 +2432,7 @@ public class SenderTest {
sender.runOnce(); // connect
sender.runOnce(); // send produce request
assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence should be 2");
assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence should be 2");
String id = client.requests().peek().destination();
assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey());
Node node = new Node(Integer.parseInt(id), "localhost", 0);
@ -2443,12 +2443,12 @@ public class SenderTest {
responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
client.respond(new ProduceResponse(responseMap));
sender.runOnce(); // split and reenqueue
assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence should be 2");
assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence should be 2");
// The compression ratio should have been improved once.
assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP,
CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01);
sender.runOnce(); // send the first produce request
assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence number should be 2");
assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence number should be 2");
assertFalse(f1.isDone(), "The future shouldn't have been done.");
assertFalse(f2.isDone(), "The future shouldn't have been done.");
id = client.requests().peek().destination();
@ -2463,7 +2463,7 @@ public class SenderTest {
sender.runOnce(); // receive
assertTrue(f1.isDone(), "The future should have been done.");
assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence number should still be 2");
assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence number should still be 2");
assertEquals(OptionalInt.of(0), txnManager.lastAckedSequence(tp), "The last ack'd sequence number should be 0");
assertFalse(f2.isDone(), "The future shouldn't have been done.");
assertEquals(0L, f1.get().offset(), "Offset of the first message should be 0");
@ -2480,7 +2480,7 @@ public class SenderTest {
sender.runOnce(); // receive
assertTrue(f2.isDone(), "The future should have been done.");
assertEquals(2, txnManager.sequenceNumber(tp).longValue(), "The next sequence number should be 2");
assertEquals(2, txnManager.sequenceNumber(tp), "The next sequence number should be 2");
assertEquals(OptionalInt.of(1), txnManager.lastAckedSequence(tp), "The last ack'd sequence number should be 1");
assertEquals(1L, f2.get().offset(), "Offset of the first message should be 1");
assertTrue(accumulator.getDeque(tp).isEmpty(), "There should be no batch in the accumulator");

View File

@ -564,9 +564,9 @@ public class TransactionManagerTest {
@Test
public void testDefaultSequenceNumber() {
initializeTransactionManager(Optional.empty());
assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
assertEquals(transactionManager.sequenceNumber(tp0), 0);
transactionManager.incrementSequenceNumber(tp0, 3);
assertEquals((int) transactionManager.sequenceNumber(tp0), 3);
assertEquals(transactionManager.sequenceNumber(tp0), 3);
}
@Test
@ -579,7 +579,7 @@ public class TransactionManagerTest {
ProducerBatch b3 = writeIdempotentBatchWithValue(transactionManager, tp0, "3");
ProducerBatch b4 = writeIdempotentBatchWithValue(transactionManager, tp0, "4");
ProducerBatch b5 = writeIdempotentBatchWithValue(transactionManager, tp0, "5");
assertEquals(5, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(5, transactionManager.sequenceNumber(tp0));
// First batch succeeds
long b1AppendTime = time.milliseconds();
@ -624,8 +624,8 @@ public class TransactionManagerTest {
ProducerBatch tp0b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2");
ProducerBatch tp1b2 = writeIdempotentBatchWithValue(transactionManager, tp1, "2");
assertEquals(2, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(2, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp1));
ProduceResponse.PartitionResponse b1Response = new ProduceResponse.PartitionResponse(
Errors.UNKNOWN_PRODUCER_ID, -1, -1, 400L);
@ -637,9 +637,9 @@ public class TransactionManagerTest {
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
assertEquals(tp0b2, transactionManager.nextBatchBySequence(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(2, transactionManager.sequenceNumber(tp1));
assertEquals(tp1b2, transactionManager.nextBatchBySequence(tp1));
}
@ -654,7 +654,7 @@ public class TransactionManagerTest {
writeIdempotentBatchWithValue(transactionManager, tp1, "1");
ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2");
assertEquals(2, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
// The producerId might be reset due to a failure on another partition
transactionManager.requestEpochBumpForPartition(tp1);
@ -666,7 +666,7 @@ public class TransactionManagerTest {
Errors.NONE, 500L, time.milliseconds(), 0L);
transactionManager.handleCompletedBatch(b1, b1Response);
assertEquals(2, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
assertEquals(0, transactionManager.lastAckedSequence(tp0).getAsInt());
assertEquals(b2, transactionManager.nextBatchBySequence(tp0));
assertEquals(epoch, transactionManager.nextBatchBySequence(tp0).producerEpoch());
@ -676,7 +676,7 @@ public class TransactionManagerTest {
transactionManager.handleCompletedBatch(b2, b2Response);
transactionManager.maybeUpdateProducerIdAndEpoch(tp0);
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
assertFalse(transactionManager.lastAckedSequence(tp0).isPresent());
assertNull(transactionManager.nextBatchBySequence(tp0));
}
@ -698,23 +698,23 @@ public class TransactionManagerTest {
MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(metrics), this.time, requestTimeout,
0, transactionManager, apiVersions);
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
Future<RecordMetadata> responseFuture1 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),
"1".getBytes(), "1".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(),
TestUtils.singletonCluster()).future;
sender.runOnce();
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
time.sleep(requestTimeout);
sender.runOnce();
assertEquals(0, client.inFlightRequestCount());
assertTrue(transactionManager.hasInflightBatches(tp0));
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
sender.runOnce(); // retry
assertEquals(1, client.inFlightRequestCount());
assertTrue(transactionManager.hasInflightBatches(tp0));
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
time.sleep(5000); // delivery time out
sender.runOnce();
@ -729,7 +729,7 @@ public class TransactionManagerTest {
sender.runOnce(); // bump the epoch
assertEquals(epoch + 1, transactionManager.producerIdAndEpoch().epoch);
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
Future<RecordMetadata> responseFuture2 = accumulator.append(tp0.topic(), tp0.partition(), time.milliseconds(),
"2".getBytes(), "2".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT, false, time.milliseconds(),
@ -737,7 +737,7 @@ public class TransactionManagerTest {
sender.runOnce();
sender.runOnce();
assertEquals(0, transactionManager.firstInFlightSequence(tp0));
assertEquals(1, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp0));
time.sleep(5000); // request time out again
sender.runOnce();
@ -770,30 +770,30 @@ public class TransactionManagerTest {
@Test
public void testSequenceNumberOverflow() {
initializeTransactionManager(Optional.empty());
assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
assertEquals(transactionManager.sequenceNumber(tp0), 0);
transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
assertEquals((int) transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE);
assertEquals(transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE);
transactionManager.incrementSequenceNumber(tp0, 100);
assertEquals((int) transactionManager.sequenceNumber(tp0), 99);
assertEquals(transactionManager.sequenceNumber(tp0), 99);
transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE);
assertEquals((int) transactionManager.sequenceNumber(tp0), 98);
assertEquals(transactionManager.sequenceNumber(tp0), 98);
}
@Test
public void testProducerIdReset() {
initializeTransactionManager(Optional.empty());
initializeIdempotentProducerId(15L, Short.MAX_VALUE);
assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
assertEquals((int) transactionManager.sequenceNumber(tp1), 0);
assertEquals(transactionManager.sequenceNumber(tp0), 0);
assertEquals(transactionManager.sequenceNumber(tp1), 0);
transactionManager.incrementSequenceNumber(tp0, 3);
assertEquals((int) transactionManager.sequenceNumber(tp0), 3);
assertEquals(transactionManager.sequenceNumber(tp0), 3);
transactionManager.incrementSequenceNumber(tp1, 3);
assertEquals((int) transactionManager.sequenceNumber(tp1), 3);
assertEquals(transactionManager.sequenceNumber(tp1), 3);
transactionManager.requestEpochBumpForPartition(tp0);
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
assertEquals((int) transactionManager.sequenceNumber(tp0), 0);
assertEquals((int) transactionManager.sequenceNumber(tp1), 3);
assertEquals(transactionManager.sequenceNumber(tp0), 0);
assertEquals(transactionManager.sequenceNumber(tp1), 3);
}
@Test
@ -2835,7 +2835,7 @@ public class TransactionManagerTest {
ProducerBatch b1 = writeIdempotentBatchWithValue(transactionManager, tp0, "1");
ProducerBatch b2 = writeIdempotentBatchWithValue(transactionManager, tp0, "2");
ProducerBatch b3 = writeIdempotentBatchWithValue(transactionManager, tp0, "3");
assertEquals(3, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
// The first batch fails with a timeout
transactionManager.markSequenceUnresolved(b1);
@ -2860,7 +2860,7 @@ public class TransactionManagerTest {
transactionManager.maybeResolveSequences();
assertEquals(producerIdAndEpoch, transactionManager.producerIdAndEpoch());
assertFalse(transactionManager.hasUnresolvedSequences());
assertEquals(3, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
}
@Test
@ -2893,7 +2893,7 @@ public class TransactionManagerTest {
// Run sender loop to trigger epoch bump
runUntil(() -> transactionManager.producerIdAndEpoch().epoch == 2);
assertFalse(transactionManager.hasUnresolvedSequences());
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
}
@Test
@ -2966,7 +2966,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
runUntil(() -> transactionManager.isPartitionAdded(tp0)); // Send AddPartitionsRequest
assertEquals(2, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(2, transactionManager.sequenceNumber(tp0));
}
@Test
@ -3026,8 +3026,8 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
runUntil(() -> transactionManager.isPartitionAdded(tp0));
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
assertEquals(1, transactionManager.sequenceNumber(tp1));
}
@Test
@ -3073,7 +3073,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, producerId);
runUntil(() -> transactionManager.isPartitionAdded(tp0));
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
}
@Test
@ -3120,7 +3120,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, producerId);
runUntil(() -> transactionManager.isPartitionAdded(tp0));
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
}
@Test
@ -3179,7 +3179,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, bumpedEpoch, producerId);
runUntil(() -> transactionManager.isPartitionAdded(tp0));
assertEquals(0, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp0));
}
@Test
@ -3249,8 +3249,8 @@ public class TransactionManagerTest {
writeIdempotentBatchWithValue(transactionManager, tp0, "3");
ProducerBatch tp1b1 = writeIdempotentBatchWithValue(transactionManager, tp1, "4");
ProducerBatch tp1b2 = writeIdempotentBatchWithValue(transactionManager, tp1, "5");
assertEquals(3, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(2, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp1));
// First batch of each partition succeeds
long b1AppendTime = time.milliseconds();
@ -3311,7 +3311,7 @@ public class TransactionManagerTest {
transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
assertFalse(transactionManager.hasInflightBatches(tp1));
assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp1));
// The last batch should now be drained and sent
runUntil(() -> transactionManager.hasInflightBatches(tp1));
@ -3326,7 +3326,7 @@ public class TransactionManagerTest {
transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
assertFalse(transactionManager.hasInflightBatches(tp1));
assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp1));
}
@Test
@ -3373,8 +3373,8 @@ public class TransactionManagerTest {
writeIdempotentBatchWithValue(transactionManager, tp0, "3");
ProducerBatch tp1b1 = writeIdempotentBatchWithValue(transactionManager, tp1, "4");
ProducerBatch tp1b2 = writeIdempotentBatchWithValue(transactionManager, tp1, "5");
assertEquals(3, transactionManager.sequenceNumber(tp0).intValue());
assertEquals(2, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(3, transactionManager.sequenceNumber(tp0));
assertEquals(2, transactionManager.sequenceNumber(tp1));
// First batch of each partition succeeds
long b1AppendTime = time.milliseconds();
@ -3435,7 +3435,7 @@ public class TransactionManagerTest {
transactionManager.maybeUpdateProducerIdAndEpoch(tp1);
assertFalse(transactionManager.hasInflightBatches(tp1));
assertEquals(0, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(0, transactionManager.sequenceNumber(tp1));
// The last batch should now be drained and sent
runUntil(() -> transactionManager.hasInflightBatches(tp1));
@ -3449,7 +3449,7 @@ public class TransactionManagerTest {
transactionManager.handleCompletedBatch(tp1b3, t1b3Response);
assertFalse(transactionManager.hasInflightBatches(tp1));
assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
assertEquals(1, transactionManager.sequenceNumber(tp1));
}
@Test