KAFKA-6361: Fix log divergence between leader and follower after fast leader fail over (#4882)

Implementation of KIP-279 as described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over

In summary:
- Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE
- Leader replies with the pair( largest epoch less than or equal to the requested epoch, the end offset of this epoch)
- If Follower does not know about the leader epoch that leader replies with, it truncates to the end offset of largest leader epoch less than leader epoch that leader replied with, and sends another OffsetForLeaderEpoch request. That request contains the largest leader epoch less than leader epoch that leader replied with.

Reviewers: Dong Lin <lindong28@gmail.com>, Jun Rao <junrao@gmail.com>
This commit is contained in:
Anna Povzner 2018-05-09 18:49:51 -07:00 committed by Jun Rao
parent 0b1a118f45
commit 9679c44d2b
24 changed files with 750 additions and 198 deletions

View File

@ -26,6 +26,7 @@ public class CommonFields {
public static final Field.Int32 PARTITION_ID = new Field.Int32("partition", "Topic partition id"); public static final Field.Int32 PARTITION_ID = new Field.Int32("partition", "Topic partition id");
public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Response error code"); public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Response error code");
public static final Field.NullableStr ERROR_MESSAGE = new Field.NullableStr("error_message", "Response error message"); public static final Field.NullableStr ERROR_MESSAGE = new Field.NullableStr("error_message", "Response error message");
public static final Field.Int32 LEADER_EPOCH = new Field.Int32("leader_epoch", "The epoch");
// Group APIs // Group APIs
public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier"); public static final Field.Str GROUP_ID = new Field.Str("group_id", "The unique group identifier");

View File

@ -20,24 +20,29 @@ import org.apache.kafka.common.protocol.Errors;
import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH; import static org.apache.kafka.common.record.RecordBatch.NO_PARTITION_LEADER_EPOCH;
import java.util.Objects;
/** /**
* The offset, fetched from a leader, for a particular partition. * The offset, fetched from a leader, for a particular partition.
*/ */
public class EpochEndOffset { public class EpochEndOffset {
public static final long UNDEFINED_EPOCH_OFFSET = NO_PARTITION_LEADER_EPOCH; public static final long UNDEFINED_EPOCH_OFFSET = NO_PARTITION_LEADER_EPOCH;
public static final int UNDEFINED_EPOCH = -1; public static final int UNDEFINED_EPOCH = NO_PARTITION_LEADER_EPOCH;
private Errors error; private Errors error;
private int leaderEpoch; // introduced in V1
private long endOffset; private long endOffset;
public EpochEndOffset(Errors error, long endOffset) { public EpochEndOffset(Errors error, int leaderEpoch, long endOffset) {
this.error = error; this.error = error;
this.leaderEpoch = leaderEpoch;
this.endOffset = endOffset; this.endOffset = endOffset;
} }
public EpochEndOffset(long endOffset) { public EpochEndOffset(int leaderEpoch, long endOffset) {
this.error = Errors.NONE; this.error = Errors.NONE;
this.leaderEpoch = leaderEpoch;
this.endOffset = endOffset; this.endOffset = endOffset;
} }
@ -53,10 +58,15 @@ public class EpochEndOffset {
return endOffset; return endOffset;
} }
public int leaderEpoch() {
return leaderEpoch;
}
@Override @Override
public String toString() { public String toString() {
return "EpochEndOffset{" + return "EpochEndOffset{" +
"error=" + error + "error=" + error +
", leaderEpoch=" + leaderEpoch +
", endOffset=" + endOffset + ", endOffset=" + endOffset +
'}'; '}';
} }
@ -68,14 +78,13 @@ public class EpochEndOffset {
EpochEndOffset that = (EpochEndOffset) o; EpochEndOffset that = (EpochEndOffset) o;
if (error != that.error) return false; return Objects.equals(error, that.error)
return endOffset == that.endOffset; && Objects.equals(leaderEpoch, that.leaderEpoch)
&& Objects.equals(endOffset, that.endOffset);
} }
@Override @Override
public int hashCode() { public int hashCode() {
int result = (int) error.code(); return Objects.hash(error, leaderEpoch, endOffset);
result = 31 * result + (int) (endOffset ^ (endOffset >>> 32));
return result;
} }
} }

View File

@ -50,8 +50,11 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema( private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0), "An array of topics to get epochs for")); new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0), "An array of topics to get epochs for"));
/* v1 request is the same as v0. Per-partition leader epoch has been added to response */
private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V1 = OFFSET_FOR_LEADER_EPOCH_REQUEST_V0;
public static Schema[] schemaVersions() { public static Schema[] schemaVersions() {
return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0}; return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0, OFFSET_FOR_LEADER_EPOCH_REQUEST_V1};
} }
private Map<TopicPartition, Integer> epochsByPartition; private Map<TopicPartition, Integer> epochsByPartition;
@ -63,12 +66,12 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> { public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
private Map<TopicPartition, Integer> epochsByPartition = new HashMap<>(); private Map<TopicPartition, Integer> epochsByPartition = new HashMap<>();
public Builder() { public Builder(short version) {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH); super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
} }
public Builder(Map<TopicPartition, Integer> epochsByPartition) { public Builder(short version, Map<TopicPartition, Integer> epochsByPartition) {
super(ApiKeys.OFFSET_FOR_LEADER_EPOCH); super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
this.epochsByPartition = epochsByPartition; this.epochsByPartition = epochsByPartition;
} }
@ -150,7 +153,8 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
Errors error = Errors.forException(e); Errors error = Errors.forException(e);
Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap<>(); Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap<>();
for (TopicPartition tp : epochsByPartition.keySet()) { for (TopicPartition tp : epochsByPartition.keySet()) {
errorResponse.put(tp, new EpochEndOffset(error, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)); errorResponse.put(tp, new EpochEndOffset(
error, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
} }
return new OffsetsForLeaderEpochResponse(errorResponse); return new OffsetsForLeaderEpochResponse(errorResponse);
} }

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.utils.CollectionUtils; import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -34,6 +35,7 @@ import java.util.Map;
import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
import static org.apache.kafka.common.protocol.CommonFields.LEADER_EPOCH;
import static org.apache.kafka.common.protocol.types.Type.INT64; import static org.apache.kafka.common.protocol.types.Type.INT64;
public class OffsetsForLeaderEpochResponse extends AbstractResponse { public class OffsetsForLeaderEpochResponse extends AbstractResponse {
@ -52,8 +54,23 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0), new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
"An array of topics for which we have leader offsets for some requested Partition Leader Epoch")); "An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));
// OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 added a per-partition leader epoch field,
// which specifies which leader epoch the end offset belongs to
private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1 = new Schema(
ERROR_CODE,
PARTITION_ID,
LEADER_EPOCH,
new Field(END_OFFSET_KEY_NAME, INT64, "The end offset"));
private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V1 = new Schema(
TOPIC_NAME,
new Field(PARTITIONS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V1)));
private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new Schema(
new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V1),
"An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));
public static Schema[] schemaVersions() { public static Schema[] schemaVersions() {
return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0}; return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1};
} }
private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition; private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
@ -68,8 +85,9 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
Errors error = Errors.forCode(partitionAndEpoch.get(ERROR_CODE)); Errors error = Errors.forCode(partitionAndEpoch.get(ERROR_CODE));
int partitionId = partitionAndEpoch.get(PARTITION_ID); int partitionId = partitionAndEpoch.get(PARTITION_ID);
TopicPartition tp = new TopicPartition(topic, partitionId); TopicPartition tp = new TopicPartition(topic, partitionId);
int leaderEpoch = partitionAndEpoch.getOrElse(LEADER_EPOCH, RecordBatch.NO_PARTITION_LEADER_EPOCH);
long endOffset = partitionAndEpoch.getLong(END_OFFSET_KEY_NAME); long endOffset = partitionAndEpoch.getLong(END_OFFSET_KEY_NAME);
epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, endOffset)); epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, leaderEpoch, endOffset));
} }
} }
} }
@ -110,6 +128,7 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME);
partitionStruct.set(ERROR_CODE, partitionEndOffset.getValue().error().code()); partitionStruct.set(ERROR_CODE, partitionEndOffset.getValue().error().code());
partitionStruct.set(PARTITION_ID, partitionEndOffset.getKey()); partitionStruct.set(PARTITION_ID, partitionEndOffset.getKey());
partitionStruct.setIfExists(LEADER_EPOCH, partitionEndOffset.getValue().leaderEpoch());
partitionStruct.set(END_OFFSET_KEY_NAME, partitionEndOffset.getValue().endOffset()); partitionStruct.set(END_OFFSET_KEY_NAME, partitionEndOffset.getValue().endOffset());
partitions.add(partitionStruct); partitions.add(partitionStruct);
} }

View File

@ -1020,15 +1020,15 @@ public class RequestResponseTest {
epochs.put(new TopicPartition("topic1", 1), 1); epochs.put(new TopicPartition("topic1", 1), 1);
epochs.put(new TopicPartition("topic2", 2), 3); epochs.put(new TopicPartition("topic2", 2), 3);
return new OffsetsForLeaderEpochRequest.Builder(epochs).build(); return new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), epochs).build();
} }
private OffsetsForLeaderEpochResponse createLeaderEpochResponse() { private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
Map<TopicPartition, EpochEndOffset> epochs = new HashMap<>(); Map<TopicPartition, EpochEndOffset> epochs = new HashMap<>();
epochs.put(new TopicPartition("topic1", 0), new EpochEndOffset(Errors.NONE, 0)); epochs.put(new TopicPartition("topic1", 0), new EpochEndOffset(Errors.NONE, 1, 0));
epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1)); epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1, 1));
epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 2)); epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 1, 2));
return new OffsetsForLeaderEpochResponse(epochs); return new OffsetsForLeaderEpochResponse(epochs);
} }

View File

@ -77,6 +77,7 @@ object ApiVersion {
// and KafkaStorageException for fetch requests. // and KafkaStorageException for fetch requests.
"1.1-IV0" -> KAFKA_1_1_IV0, "1.1-IV0" -> KAFKA_1_1_IV0,
"1.1" -> KAFKA_1_1_IV0, "1.1" -> KAFKA_1_1_IV0,
// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
"2.0-IV0" -> KAFKA_2_0_IV0, "2.0-IV0" -> KAFKA_2_0_IV0,
"2.0" -> KAFKA_2_0_IV0 "2.0" -> KAFKA_2_0_IV0
) )

View File

@ -647,15 +647,20 @@ class Partition(val topic: String,
/** /**
* @param leaderEpoch Requested leader epoch * @param leaderEpoch Requested leader epoch
* @return The last offset of messages published under this leader epoch. * @return The requested leader epoch and the end offset of this leader epoch, or if the requested
* leader epoch is unknown, the leader epoch less than the requested leader epoch and the end offset
* of this leader epoch. The end offset of a leader epoch is defined as the start
* offset of the first leader epoch larger than the leader epoch, or else the log end
* offset if the leader epoch is the latest leader epoch.
*/ */
def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = { def lastOffsetForLeaderEpoch(leaderEpoch: Int): EpochEndOffset = {
inReadLock(leaderIsrUpdateLock) { inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match { leaderReplicaIfLocal match {
case Some(leaderReplica) => case Some(leaderReplica) =>
new EpochEndOffset(NONE, leaderReplica.epochs.get.endOffsetFor(leaderEpoch)) val (epoch, offset) = leaderReplica.epochs.get.endOffsetFor(leaderEpoch)
new EpochEndOffset(NONE, epoch, offset)
case None => case None =>
new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET) new EpochEndOffset(NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} }
} }
} }

View File

@ -20,7 +20,7 @@ package kafka.consumer
import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request} import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request}
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.message.ByteBufferMessageSet import kafka.message.ByteBufferMessageSet
import kafka.server.{AbstractFetcherThread, PartitionFetchState} import kafka.server.{AbstractFetcherThread, PartitionFetchState, OffsetTruncationState}
import AbstractFetcherThread.ResultWithPartitions import AbstractFetcherThread.ResultWithPartitions
import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.common.{ErrorMapping, TopicAndPartition}
@ -129,7 +129,7 @@ class ConsumerFetcherThread(consumerIdString: String,
override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() } override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() }
override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
ResultWithPartitions(Map(), Set()) ResultWithPartitions(Map(), Set())
} }
} }

View File

@ -19,9 +19,10 @@ package kafka.server
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.BrokerEndPoint import kafka.cluster.{Replica, BrokerEndPoint}
import kafka.utils.{DelayedItem, Pool, ShutdownableThread} import kafka.utils.{DelayedItem, Pool, ShutdownableThread}
import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException} import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
import org.apache.kafka.common.requests.EpochEndOffset._
import kafka.common.{ClientIdAndBroker, KafkaException} import kafka.common.{ClientIdAndBroker, KafkaException}
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock import kafka.utils.CoreUtils.inLock
@ -39,6 +40,8 @@ import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.EpochEndOffset import org.apache.kafka.common.requests.EpochEndOffset
import scala.math._
/** /**
* Abstract class for fetching data from multiple partitions from the same broker. * Abstract class for fetching data from multiple partitions from the same broker.
*/ */
@ -76,7 +79,7 @@ abstract class AbstractFetcherThread(name: String,
protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset]
protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]]
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[REQ] protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[REQ]
@ -132,7 +135,7 @@ abstract class AbstractFetcherThread(name: String,
val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) } val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }
val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncate(leaderEpochs) val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncate(leaderEpochs)
handlePartitionsWithErrors(partitionsWithError) handlePartitionsWithErrors(partitionsWithError)
markTruncationCompleteAndUpdateFetchOffset(fetchOffsets) updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
} }
} }
} }
@ -272,15 +275,16 @@ abstract class AbstractFetcherThread(name: String,
} }
/** /**
* Loop through all partitions, marking them as truncation complete and update the fetch offset * Loop through all partitions, updating their fetch offset and maybe marking them as
* truncation completed if their offsetTruncationState indicates truncation completed
* *
* @param fetchOffsets the partitions to mark truncation complete * @param fetchOffsets the partitions to update fetch offset and maybe mark truncation complete
*/ */
private def markTruncationCompleteAndUpdateFetchOffset(fetchOffsets: Map[TopicPartition, Long]) { private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]) {
val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
.map { state => .map { state =>
val maybeTruncationComplete = fetchOffsets.get(state.topicPartition()) match { val maybeTruncationComplete = fetchOffsets.get(state.topicPartition()) match {
case Some(offset) => PartitionFetchState(offset, state.value.delay, truncatingLog = false) case Some(offsetTruncationState) => PartitionFetchState(offsetTruncationState.offset, state.value.delay, truncatingLog = !offsetTruncationState.truncationCompleted)
case None => state.value() case None => state.value()
} }
(state.topicPartition(), maybeTruncationComplete) (state.topicPartition(), maybeTruncationComplete)
@ -288,6 +292,79 @@ abstract class AbstractFetcherThread(name: String,
partitionStates.set(newStates.asJava) partitionStates.set(newStates.asJava)
} }
/**
* Called from ReplicaFetcherThread and ReplicaAlterLogDirsThread maybeTruncate for each topic
* partition. Returns truncation offset and whether this is the final offset to truncate to
*
* For each topic partition, the offset to truncate to is calculated based on leader's returned
* epoch and offset:
* -- If the leader replied with undefined epoch offset, we must use the high watermark. This can
* happen if 1) the leader is still using message format older than KAFKA_0_11_0; 2) the follower
* requested leader epoch < the first leader epoch known to the leader.
* -- If the leader replied with the valid offset but undefined leader epoch, we truncate to
* leader's offset if it is lower than follower's Log End Offset. This may happen if the
* leader is on the inter-broker protocol version < KAFKA_2_0_IV0
* -- If the leader replied with leader epoch not known to the follower, we truncate to the
* end offset of the largest epoch that is smaller than the epoch the leader replied with, and
* send OffsetsForLeaderEpochRequest with that leader epoch. In a more rare case, where the
* follower was not tracking epochs smaller than the epoch the leader replied with, we
* truncate the leader's offset (and do not send any more leader epoch requests).
* -- Otherwise, truncate to min(leader's offset, end offset on the follower for epoch that
* leader replied with, follower's Log End Offset).
*
* @param tp Topic partition
* @param leaderEpochOffset Epoch end offset received from the leader for this topic partition
* @param replica Follower's replica, which is either local replica
* (ReplicaFetcherThread) or future replica (ReplicaAlterLogDirsThread)
* @param isFutureReplica true if called from ReplicaAlterLogDirsThread
*/
def getOffsetTruncationState(tp: TopicPartition, leaderEpochOffset: EpochEndOffset, replica: Replica, isFutureReplica: Boolean = false): OffsetTruncationState = {
// to make sure we can distinguish log output for fetching from remote leader or local replica
val followerName = if (isFutureReplica) "future replica" else "follower"
if (leaderEpochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) {
// truncate to initial offset which is the high watermark for follower replica. For
// future replica, it is either high watermark of the future replica or current
// replica's truncation offset (when the current replica truncates, it forces future
// replica's partition state to 'truncating' and sets initial offset to its truncation offset)
warn(s"Based on $followerName's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " +
s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset, truncationCompleted = true)
} else if (leaderEpochOffset.leaderEpoch == UNDEFINED_EPOCH) {
// either leader or follower or both use inter-broker protocol version < KAFKA_2_0_IV0
// (version 0 of OffsetForLeaderEpoch request/response)
warn(s"Leader or $followerName is on protocol version where leader epoch is not considered in the OffsetsForLeaderEpoch response. " +
s"The leader's offset ${leaderEpochOffset.endOffset} will be used for truncation in ${replica.topicPartition}.")
OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset.messageOffset), truncationCompleted = true)
} else {
// get (leader epoch, end offset) pair that corresponds to the largest leader epoch
// less than or equal to the requested epoch.
val (followerEpoch, followerEndOffset) = replica.epochs.get.endOffsetFor(leaderEpochOffset.leaderEpoch)
if (followerEndOffset == UNDEFINED_EPOCH_OFFSET) {
// This can happen if the follower was not tracking leader epochs at that point (before the
// upgrade, or if this broker is new). Since the leader replied with epoch <
// requested epoch from follower, so should be safe to truncate to leader's
// offset (this is the same behavior as post-KIP-101 and pre-KIP-279)
warn(s"Based on $followerName's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " +
s"below any $followerName's tracked epochs for ${replica.topicPartition}. " +
s"The leader's offset only ${leaderEpochOffset.endOffset} will be used for truncation.")
OffsetTruncationState(min(leaderEpochOffset.endOffset, replica.logEndOffset.messageOffset), truncationCompleted = true)
} else if (followerEpoch != leaderEpochOffset.leaderEpoch) {
// the follower does not know about the epoch that leader replied with
// we truncate to the end offset of the largest epoch that is smaller than the
// epoch the leader replied with, and send another offset for leader epoch request
val intermediateOffsetToTruncateTo = min(followerEndOffset, replica.logEndOffset.messageOffset)
info(s"Based on $followerName's leader epoch, leader replied with epoch ${leaderEpochOffset.leaderEpoch} " +
s"unknown to the $followerName for ${replica.topicPartition}. " +
s"Will truncate to $intermediateOffsetToTruncateTo and send another leader epoch request to the leader.")
OffsetTruncationState(intermediateOffsetToTruncateTo, truncationCompleted = false)
} else {
val offsetToTruncateTo = min(followerEndOffset, leaderEpochOffset.endOffset)
OffsetTruncationState(min(offsetToTruncateTo, replica.logEndOffset.messageOffset), truncationCompleted = true)
}
}
}
def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) { def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) {
partitionMapLock.lockInterruptibly() partitionMapLock.lockInterruptibly()
try { try {
@ -446,3 +523,10 @@ case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem, truncating
override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, isReadyForFetch, truncatingLog) override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, isReadyForFetch, truncatingLog)
} }
case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) {
def this(offset: Long) = this(offset, true)
override def toString = "offset:%d-truncationCompleted:%b".format(offset, truncationCompleted)
}

View File

@ -37,7 +37,6 @@ import org.apache.kafka.common.record.{FileRecords, MemoryRecords}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable} import scala.collection.{Map, Seq, Set, mutable}
class ReplicaAlterLogDirsThread(name: String, class ReplicaAlterLogDirsThread(name: String,
sourceBroker: BrokerEndPoint, sourceBroker: BrokerEndPoint,
brokerConfig: KafkaConfig, brokerConfig: KafkaConfig,
@ -102,7 +101,8 @@ class ReplicaAlterLogDirsThread(name: String,
// Append the leader's messages to the log // Append the leader's messages to the log
partition.appendRecordsToFutureReplica(records) partition.appendRecordsToFutureReplica(records)
futureReplica.highWatermark = new LogOffsetMetadata(partitionData.highWatermark) val futureReplicaHighWatermark = futureReplica.logEndOffset.messageOffset.min(partitionData.highWatermark)
futureReplica.highWatermark = new LogOffsetMetadata(futureReplicaHighWatermark)
futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset) futureReplica.maybeIncrementLogStartOffset(partitionData.logStartOffset)
if (partition.maybeReplaceCurrentWithFutureReplica()) if (partition.maybeReplaceCurrentWithFutureReplica())
@ -164,17 +164,32 @@ class ReplicaAlterLogDirsThread(name: String,
def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
partitions.map { case (tp, epoch) => partitions.map { case (tp, epoch) =>
try { try {
tp -> new EpochEndOffset(Errors.NONE, replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch)) val (leaderEpoch, leaderOffset) = replicaMgr.getReplicaOrException(tp).epochs.get.endOffsetFor(epoch)
tp -> new EpochEndOffset(Errors.NONE, leaderEpoch, leaderOffset)
} catch { } catch {
case t: Throwable => case t: Throwable =>
warn(s"Error when getting EpochEndOffset for $tp", t) warn(s"Error when getting EpochEndOffset for $tp", t)
tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH_OFFSET) tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} }
} }
} }
def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { /**
val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long] * Truncate the log for each partition based on current replica's returned epoch and offset.
*
* The logic for finding the truncation offset is the same as in ReplicaFetcherThread
* and mainly implemented in AbstractFetcherThread.getOffsetTruncationState. One difference is
* that the initial fetch offset for topic partition could be set to the truncation offset of
* the current replica if that replica truncates. Otherwise, it is high watermark as in ReplicaFetcherThread.
*
* The reason we have to follow the leader epoch approach for truncating a future replica is to
* cover the case where a future replica is offline when the current replica truncates and
* re-replicates offsets that may have already been copied to the future replica. In that case,
* the future replica may miss "mark for truncation" event and must use the offset for leader epoch
* exchange with the current replica to truncate to the largest common log prefix for the topic partition
*/
def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
val partitionsWithError = mutable.Set[TopicPartition]() val partitionsWithError = mutable.Set[TopicPartition]()
fetchedEpochs.foreach { case (topicPartition, epochOffset) => fetchedEpochs.foreach { case (topicPartition, epochOffset) =>
@ -186,16 +201,10 @@ class ReplicaAlterLogDirsThread(name: String,
info(s"Retrying leaderEpoch request for partition $topicPartition as the current replica reported an error: ${epochOffset.error}") info(s"Retrying leaderEpoch request for partition $topicPartition as the current replica reported an error: ${epochOffset.error}")
partitionsWithError += topicPartition partitionsWithError += topicPartition
} else { } else {
val fetchOffset = val offsetTruncationState = getOffsetTruncationState(topicPartition, epochOffset, futureReplica, isFutureReplica = true)
if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET)
partitionStates.stateValue(topicPartition).fetchOffset
else if (epochOffset.endOffset >= futureReplica.logEndOffset.messageOffset)
futureReplica.logEndOffset.messageOffset
else
epochOffset.endOffset
partition.truncateTo(fetchOffset, isFuture = true) partition.truncateTo(offsetTruncationState.offset, isFuture = true)
fetchOffsets.put(topicPartition, fetchOffset) fetchOffsets.put(topicPartition, offsetTruncationState)
} }
} catch { } catch {
case e: KafkaStorageException => case e: KafkaStorageException =>

View File

@ -21,7 +21,7 @@ import java.util
import AbstractFetcherThread.ResultWithPartitions import AbstractFetcherThread.ResultWithPartitions
import kafka.api.{FetchRequest => _, _} import kafka.api.{FetchRequest => _, _}
import kafka.cluster.{BrokerEndPoint, Replica} import kafka.cluster.BrokerEndPoint
import kafka.log.LogConfig import kafka.log.LogConfig
import kafka.server.ReplicaFetcherThread._ import kafka.server.ReplicaFetcherThread._
import kafka.server.epoch.LeaderEpochCache import kafka.server.epoch.LeaderEpochCache
@ -74,6 +74,9 @@ class ReplicaFetcherThread(name: String,
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1 else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
else 0 else 0
private val offsetForLeaderEpochRequestVersion: Short =
if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) 1
else 0
private val fetchMetadataSupported = brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0 private val fetchMetadataSupported = brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0
private val maxWait = brokerConfig.replicaFetchWaitMaxMs private val maxWait = brokerConfig.replicaFetchWaitMaxMs
private val minBytes = brokerConfig.replicaFetchMinBytes private val minBytes = brokerConfig.replicaFetchMinBytes
@ -286,37 +289,31 @@ class ReplicaFetcherThread(name: String,
} }
/** /**
* - Truncate the log to the leader's offset for each partition's epoch. * Truncate the log for each partition's epoch based on leader's returned epoch and offset.
* - If the leader's offset is greater, we stick with the Log End Offset * The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState
* otherwise we truncate to the leaders offset. */
* - If the leader replied with undefined epoch offset we must use the high watermark override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
*/ val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = {
val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, Long]
val partitionsWithError = mutable.Set[TopicPartition]() val partitionsWithError = mutable.Set[TopicPartition]()
fetchedEpochs.foreach { case (tp, epochOffset) => fetchedEpochs.foreach { case (tp, leaderEpochOffset) =>
try { try {
val replica = replicaMgr.getReplicaOrException(tp) val replica = replicaMgr.getReplicaOrException(tp)
val partition = replicaMgr.getPartition(tp).get val partition = replicaMgr.getPartition(tp).get
if (epochOffset.hasError) { if (leaderEpochOffset.hasError) {
info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${epochOffset.error}") info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${leaderEpochOffset.error}")
partitionsWithError += tp partitionsWithError += tp
} else { } else {
val fetchOffset = val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset, replica)
if (epochOffset.endOffset == UNDEFINED_EPOCH_OFFSET) { if (offsetTruncationState.offset < replica.highWatermark.messageOffset)
warn(s"Based on follower's leader epoch, leader replied with an unknown offset in ${replica.topicPartition}. " + warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark ${replica.highWatermark.messageOffset}")
s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
partitionStates.stateValue(tp).fetchOffset
} else if (epochOffset.endOffset >= replica.logEndOffset.messageOffset)
logEndOffset(replica, epochOffset)
else
epochOffset.endOffset
partition.truncateTo(fetchOffset, isFuture = false) partition.truncateTo(offsetTruncationState.offset, isFuture = false)
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, fetchOffset) // mark the future replica for truncation only when we do last truncation
fetchOffsets.put(tp, fetchOffset) if (offsetTruncationState.truncationCompleted)
replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset)
fetchOffsets.put(tp, offsetTruncationState)
} }
} catch { } catch {
case e: KafkaStorageException => case e: KafkaStorageException =>
@ -344,7 +341,7 @@ class ReplicaFetcherThread(name: String,
var result: Map[TopicPartition, EpochEndOffset] = null var result: Map[TopicPartition, EpochEndOffset] = null
if (shouldSendLeaderEpochRequest) { if (shouldSendLeaderEpochRequest) {
val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava
val epochRequest = new OffsetsForLeaderEpochRequest.Builder(partitionsAsJava) val epochRequest = new OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, partitionsAsJava)
try { try {
val response = leaderEndpoint.sendRequest(epochRequest) val response = leaderEndpoint.sendRequest(epochRequest)
result = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala result = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
@ -355,26 +352,19 @@ class ReplicaFetcherThread(name: String,
// if we get any unexpected exception, mark all partitions with an error // if we get any unexpected exception, mark all partitions with an error
result = partitions.map { case (tp, _) => result = partitions.map { case (tp, _) =>
tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH_OFFSET) tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} }
} }
} else { } else {
// just generate a response with no error but UNDEFINED_OFFSET so that we can fall back to truncating using // just generate a response with no error but UNDEFINED_OFFSET so that we can fall back to truncating using
// high watermark in maybeTruncate() // high watermark in maybeTruncate()
result = partitions.map { case (tp, _) => result = partitions.map { case (tp, _) =>
tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET) tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} }
} }
result result
} }
private def logEndOffset(replica: Replica, epochOffset: EpochEndOffset): Long = {
val logEndOffset = replica.logEndOffset.messageOffset
info(s"Based on follower's leader epoch, leader replied with an offset ${epochOffset.endOffset} >= the " +
s"follower's log end offset $logEndOffset in ${replica.topicPartition}. No truncation needed.")
logEndOffset
}
/** /**
* To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync. * the quota is exceeded and the replica is not in sync.

View File

@ -1475,11 +1475,11 @@ class ReplicaManager(val config: KafkaConfig,
val epochEndOffset = getPartition(tp) match { val epochEndOffset = getPartition(tp) match {
case Some(partition) => case Some(partition) =>
if (partition eq ReplicaManager.OfflinePartition) if (partition eq ReplicaManager.OfflinePartition)
new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET) new EpochEndOffset(KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
else else
partition.lastOffsetForLeaderEpoch(leaderEpoch) partition.lastOffsetForLeaderEpoch(leaderEpoch)
case None => case None =>
new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET) new EpochEndOffset(UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} }
tp -> epochEndOffset tp -> epochEndOffset
} }

View File

@ -20,7 +20,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.server.LogOffsetMetadata import kafka.server.LogOffsetMetadata
import kafka.server.checkpoints.LeaderEpochCheckpoint import kafka.server.checkpoints.LeaderEpochCheckpoint
import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.common.requests.EpochEndOffset._
import kafka.utils.CoreUtils._ import kafka.utils.CoreUtils._
import kafka.utils.Logging import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -29,7 +29,7 @@ import scala.collection.mutable.ListBuffer
trait LeaderEpochCache { trait LeaderEpochCache {
def assign(leaderEpoch: Int, offset: Long) def assign(leaderEpoch: Int, offset: Long)
def latestEpoch(): Int def latestEpoch(): Int
def endOffsetFor(epoch: Int): Long def endOffsetFor(epoch: Int): (Int, Long)
def clearAndFlushLatest(offset: Long) def clearAndFlushLatest(offset: Long)
def clearAndFlushEarliest(offset: Long) def clearAndFlushEarliest(offset: Long)
def clearAndFlush() def clearAndFlush()
@ -81,36 +81,42 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM
} }
/** /**
* Returns the End Offset for a requested Leader Epoch. * Returns the Leader Epoch and the End Offset for a requested Leader Epoch.
* *
* This is defined as the start offset of the first Leader Epoch larger than the * The Leader Epoch returned is the largest epoch less than or equal to the requested Leader
* Leader Epoch requested, or else the Log End Offset if the latest epoch was requested. * Epoch. The End Offset is the end offset of this epoch, which is defined as the start offset
* of the first Leader Epoch larger than the Leader Epoch requested, or else the Log End
* Offset if the latest epoch was requested.
* *
* During the upgrade phase, where there are existing messages may not have a leader epoch, * During the upgrade phase, where there are existing messages may not have a leader epoch,
* if requestedEpoch is < the first epoch cached, UNSUPPORTED_EPOCH_OFFSET will be returned * if requestedEpoch is < the first epoch cached, UNSUPPORTED_EPOCH_OFFSET will be returned
* so that the follower falls back to High Water Mark. * so that the follower falls back to High Water Mark.
* *
* @param requestedEpoch * @param requestedEpoch requested leader epoch
* @return offset * @return leader epoch and offset
*/ */
override def endOffsetFor(requestedEpoch: Int): Long = { override def endOffsetFor(requestedEpoch: Int): (Int, Long) = {
inReadLock(lock) { inReadLock(lock) {
val offset = val epochAndOffset =
if (requestedEpoch == UNDEFINED_EPOCH) { if (requestedEpoch == UNDEFINED_EPOCH) {
// this may happen if a bootstrapping follower sends a request with undefined epoch or // this may happen if a bootstrapping follower sends a request with undefined epoch or
// a follower is on the older message format where leader epochs are not recorded // a follower is on the older message format where leader epochs are not recorded
UNDEFINED_EPOCH_OFFSET (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} else if (requestedEpoch == latestEpoch) { } else if (requestedEpoch == latestEpoch) {
leo().messageOffset (requestedEpoch, leo().messageOffset)
} else { } else {
val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch) val (subsequentEpochs, previousEpochs) = epochs.partition { e => e.epoch > requestedEpoch}
if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch) if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch)
UNDEFINED_EPOCH_OFFSET // no epochs recorded or requested epoch < the first epoch cached
else (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
subsequentEpochs.head.startOffset else {
// we must get at least one element in previous epochs list, because if we are here,
// it means that requestedEpoch >= epochs.head.epoch -- so at least the first epoch is
(previousEpochs.last.epoch, subsequentEpochs.head.startOffset)
}
} }
debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning offset $offset from epoch list of size ${epochs.size}") debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning epoch ${epochAndOffset._1} and offset ${epochAndOffset._2} from epoch list of size ${epochs.size}")
offset epochAndOffset
} }
} }

View File

@ -290,7 +290,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
} }
private def offsetsForLeaderEpochRequest = { private def offsetsForLeaderEpochRequest = {
new OffsetsForLeaderEpochRequest.Builder().add(tp, 7).build() new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()).add(tp, 7).build()
} }
private def createOffsetFetchRequest = { private def createOffsetFetchRequest = {

View File

@ -21,6 +21,7 @@ import AbstractFetcherThread._
import com.yammer.metrics.Metrics import com.yammer.metrics.Metrics
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData} import kafka.server.AbstractFetcherThread.{FetchRequest, PartitionData}
import kafka.server.OffsetTruncationState
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
@ -131,7 +132,7 @@ class AbstractFetcherThreadTest {
override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() } override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() }
override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, Long]] = { override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
ResultWithPartitions(Map(), Set()) ResultWithPartitions(Map(), Set())
} }
} }

View File

@ -61,7 +61,7 @@ class ReplicaAlterLogDirsThreadTest {
//Stubs //Stubs
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes() expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, leo)).anyTimes()
stub(replica, replica, futureReplica, partition, replicaManager) stub(replica, replica, futureReplica, partition, replicaManager)
replay(leaderEpochs, replicaManager, replica) replay(leaderEpochs, replicaManager, replica)
@ -78,8 +78,8 @@ class ReplicaAlterLogDirsThreadTest {
val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> leaderEpoch)) val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> leaderEpoch))
val expected = Map( val expected = Map(
t1p0 -> new EpochEndOffset(Errors.NONE, leo), t1p0 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo),
t1p1 -> new EpochEndOffset(Errors.NONE, leo) t1p1 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo)
) )
assertEquals("results from leader epoch request should have offset from local replica", assertEquals("results from leader epoch request should have offset from local replica",
@ -101,7 +101,7 @@ class ReplicaAlterLogDirsThreadTest {
//Stubs //Stubs
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(leo).anyTimes() expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, leo)).anyTimes()
expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes() expect(replicaManager.getReplicaOrException(t1p0)).andReturn(replica).anyTimes()
expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes() expect(replicaManager.getPartition(t1p0)).andReturn(Some(partition)).anyTimes()
expect(replicaManager.getReplicaOrException(t1p1)).andThrow(new KafkaStorageException).once() expect(replicaManager.getReplicaOrException(t1p1)).andThrow(new KafkaStorageException).once()
@ -121,8 +121,8 @@ class ReplicaAlterLogDirsThreadTest {
val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> leaderEpoch)) val result = thread.fetchEpochsFromLeader(Map(t1p0 -> leaderEpoch, t1p1 -> leaderEpoch))
val expected = Map( val expected = Map(
t1p0 -> new EpochEndOffset(Errors.NONE, leo), t1p0 -> new EpochEndOffset(Errors.NONE, leaderEpoch, leo),
t1p1 -> new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH_OFFSET) t1p1 -> new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
) )
assertEquals(expected, result) assertEquals(expected, result)
@ -161,8 +161,10 @@ class ReplicaAlterLogDirsThreadTest {
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes() expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes() expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes() expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes()
expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn(replicaT1p0LEO).anyTimes() expect(leaderEpochsT1p0.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaT1p0LEO)).anyTimes()
expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn(replicaT1p1LEO).anyTimes() expect(leaderEpochsT1p1.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaT1p1LEO)).anyTimes()
expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, futureReplicaLEO)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes()
stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager, responseCallback) stubWithFetchMessages(replicaT1p0, replicaT1p1, futureReplica, partition, replicaManager, responseCallback)
@ -188,6 +190,73 @@ class ReplicaAlterLogDirsThreadTest {
assertTrue(truncateToCapture.getValues.asScala.contains(futureReplicaLEO)) assertTrue(truncateToCapture.getValues.asScala.contains(futureReplicaLEO))
} }
@Test
def shouldTruncateToEndOffsetOfLargestCommonEpoch(): Unit = {
//Create a capture to track what partitions/offsets are truncated
val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
// Setup all the dependencies
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createMock(classOf[LeaderEpochCache])
val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochCache])
val logManager = createMock(classOf[LogManager])
val replica = createNiceMock(classOf[Replica])
// one future replica mock because our mocking methods return same values for both future replicas
val futureReplica = createNiceMock(classOf[Replica])
val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[ReplicaManager])
val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit] = EasyMock.newCapture()
val leaderEpoch = 5
val futureReplicaLEO = 195
val replicaLEO = 200
val replicaEpochEndOffset = 190
val futureReplicaEpochEndOffset = 191
//Stubs
expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch).once()
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch - 2).once()
// leader replica truncated and fetched new offsets with new leader epoch
expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch - 1, replicaLEO)).anyTimes()
// but future replica does not know about this leader epoch, so returns a smaller leader epoch
expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch - 1)).andReturn((leaderEpoch - 2, futureReplicaLEO)).anyTimes()
// finally, the leader replica knows about the leader epoch and returns end offset
expect(leaderEpochs.endOffsetFor(leaderEpoch - 2)).andReturn((leaderEpoch - 2, replicaEpochEndOffset)).anyTimes()
expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch - 2)).andReturn((leaderEpoch - 2, futureReplicaEpochEndOffset)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes()
stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback)
replay(leaderEpochs, futureReplicaLeaderEpochs, replicaManager, logManager, quotaManager, replica, futureReplica, partition)
//Create the thread
val endPoint = new BrokerEndPoint(0, "localhost", 1000)
val thread = new ReplicaAlterLogDirsThread(
"alter-logs-dirs-thread-test1",
sourceBroker = endPoint,
brokerConfig = config,
replicaMgr = replicaManager,
quota = quotaManager,
brokerTopicStats = null)
thread.addPartitions(Map(t1p0 -> 0))
// First run will result in another offset for leader epoch request
thread.doWork()
// Second run should actually truncate
thread.doWork()
//We should have truncated to the offsets in the response
assertTrue("Expected offset " + replicaEpochEndOffset + " in captured truncation offsets " + truncateToCapture.getValues,
truncateToCapture.getValues.asScala.contains(replicaEpochEndOffset))
}
@Test @Test
def shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset(): Unit = { def shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset(): Unit = {
@ -219,9 +288,9 @@ class ReplicaAlterLogDirsThreadTest {
// pretend this is a completely new future replica, with no leader epochs recorded // pretend this is a completely new future replica, with no leader epochs recorded
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).anyTimes() expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(UNDEFINED_EPOCH).anyTimes()
// since UNDEFINED_EPOCH is -1 wich will be lower than any valid leader epoch, the method // since UNDEFINED_EPOCH is -1 which will be lower than any valid leader epoch, the method
// will return UNDEFINED_EPOCH_OFFSET if requested epoch is lower than the first epoch cached // will return UNDEFINED_EPOCH_OFFSET if requested epoch is lower than the first epoch cached
expect(leaderEpochs.endOffsetFor(UNDEFINED_EPOCH)).andReturn(UNDEFINED_EPOCH_OFFSET).anyTimes() expect(leaderEpochs.endOffsetFor(UNDEFINED_EPOCH)).andReturn((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)).anyTimes()
stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback) stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback)
replay(replicaManager, logManager, quotaManager, leaderEpochs, futureReplicaLeaderEpochs, replay(replicaManager, logManager, quotaManager, leaderEpochs, futureReplicaLeaderEpochs,
replica, futureReplica, partition) replica, futureReplica, partition)
@ -273,7 +342,8 @@ class ReplicaAlterLogDirsThreadTest {
expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes() expect(futureReplica.epochs).andReturn(Some(futureReplicaLeaderEpochs)).anyTimes()
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(futureReplicaLeaderEpoch).anyTimes() expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(futureReplicaLeaderEpoch).anyTimes()
expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn(replicaLEO).anyTimes() expect(leaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn((futureReplicaLeaderEpoch, replicaLEO)).anyTimes()
expect(futureReplicaLeaderEpochs.endOffsetFor(futureReplicaLeaderEpoch)).andReturn((futureReplicaLeaderEpoch, futureReplicaLEO)).anyTimes()
expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes() expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes() expect(replicaManager.getReplica(t1p0)).andReturn(Some(replica)).anyTimes()
@ -355,7 +425,8 @@ class ReplicaAlterLogDirsThreadTest {
expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes() expect(futureReplica.logEndOffset).andReturn(new LogOffsetMetadata(futureReplicaLEO)).anyTimes()
expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch) expect(futureReplicaLeaderEpochs.latestEpoch).andReturn(leaderEpoch)
expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn(replicaLEO) expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, replicaLEO))
expect(futureReplicaLeaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, futureReplicaLEO))
expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes()
stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback) stubWithFetchMessages(replica, replica, futureReplica, partition, replicaManager, responseCallback)

View File

@ -20,7 +20,6 @@ import kafka.cluster.{BrokerEndPoint, Replica}
import kafka.log.LogManager import kafka.log.LogManager
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.server.epoch.LeaderEpochCache import kafka.server.epoch.LeaderEpochCache
import org.apache.kafka.common.requests.EpochEndOffset._
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -28,6 +27,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.requests.EpochEndOffset import org.apache.kafka.common.requests.EpochEndOffset
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.utils.SystemTime import org.apache.kafka.common.utils.SystemTime
import org.easymock.EasyMock._ import org.easymock.EasyMock._
import org.easymock.{Capture, CaptureType} import org.easymock.{Capture, CaptureType}
@ -43,17 +43,18 @@ class ReplicaFetcherThreadTest {
private val t1p1 = new TopicPartition("topic1", 1) private val t1p1 = new TopicPartition("topic1", 1)
private val t2p1 = new TopicPartition("topic2", 1) private val t2p1 = new TopicPartition("topic2", 1)
private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)
@Test @Test
def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = { def shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11(): Unit = {
val props = TestUtils.createBrokerConfig(1, "localhost:1234") val props = TestUtils.createBrokerConfig(1, "localhost:1234")
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2") props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2") props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
val endPoint = new BrokerEndPoint(0, "localhost", 1000)
val thread = new ReplicaFetcherThread( val thread = new ReplicaFetcherThread(
name = "bob", name = "bob",
fetcherId = 0, fetcherId = 0,
sourceBroker = endPoint, sourceBroker = brokerEndPoint,
brokerConfig = config, brokerConfig = config,
replicaMgr = null, replicaMgr = null,
metrics = new Metrics(), metrics = new Metrics(),
@ -64,8 +65,8 @@ class ReplicaFetcherThreadTest {
val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0)) val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
val expected = Map( val expected = Map(
t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET), t1p0 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET),
t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH_OFFSET) t1p1 -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
) )
assertEquals("results from leader epoch request should have undefined offset", expected, result) assertEquals("results from leader epoch request should have undefined offset", expected, result)
@ -75,7 +76,6 @@ class ReplicaFetcherThreadTest {
def shouldHandleExceptionFromBlockingSend(): Unit = { def shouldHandleExceptionFromBlockingSend(): Unit = {
val props = TestUtils.createBrokerConfig(1, "localhost:1234") val props = TestUtils.createBrokerConfig(1, "localhost:1234")
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
val endPoint = new BrokerEndPoint(0, "localhost", 1000)
val mockBlockingSend = createMock(classOf[BlockingSend]) val mockBlockingSend = createMock(classOf[BlockingSend])
expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once() expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once()
@ -84,7 +84,7 @@ class ReplicaFetcherThreadTest {
val thread = new ReplicaFetcherThread( val thread = new ReplicaFetcherThread(
name = "bob", name = "bob",
fetcherId = 0, fetcherId = 0,
sourceBroker = endPoint, sourceBroker = brokerEndPoint,
brokerConfig = config, brokerConfig = config,
replicaMgr = null, replicaMgr = null,
metrics = new Metrics(), metrics = new Metrics(),
@ -95,8 +95,8 @@ class ReplicaFetcherThreadTest {
val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0)) val result = thread.fetchEpochsFromLeader(Map(t1p0 -> 0, t1p1 -> 0))
val expected = Map( val expected = Map(
t1p0 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH_OFFSET), t1p0 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET),
t1p1 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH_OFFSET) t1p1 -> new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
) )
assertEquals("results from leader epoch request should have undefined offset", expected, result) assertEquals("results from leader epoch request should have undefined offset", expected, result)
@ -104,7 +104,7 @@ class ReplicaFetcherThreadTest {
} }
@Test @Test
def shouldFetchLeaderEpochOnFirstFetchOnly(): Unit = { def shouldFetchLeaderEpochOnFirstFetchOnlyIfLeaderEpochKnownToBoth(): Unit = {
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234")) val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
//Setup all dependencies //Setup all dependencies
@ -116,27 +116,29 @@ class ReplicaFetcherThreadTest {
val partition = createMock(classOf[Partition]) val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[ReplicaManager]) val replicaManager = createMock(classOf[ReplicaManager])
val leaderEpoch = 5
//Stubs //Stubs
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(5) expect(replica.highWatermark).andReturn(new LogOffsetMetadata(0)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 0)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager) stub(replica, partition, replicaManager)
//Expectations //Expectations
expect(partition.truncateTo(anyLong(), anyBoolean())).once expect(partition.truncateTo(anyLong(), anyBoolean())).once
replay(leaderEpochs, replicaManager, logManager, quota, replica) replay(leaderEpochs, replicaManager, logManager, quota, replica)
//Define the offsets for the OffsetsForLeaderEpochResponse //Define the offsets for the OffsetsForLeaderEpochResponse
val offsets = Map(t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1)).asJava val offsets = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1)).asJava
//Create the fetcher thread //Create the fetcher thread
val endPoint = new BrokerEndPoint(0, "localhost", 1000) val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime())
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, endPoint, new SystemTime()) val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
//Loop 1 //Loop 1
@ -174,13 +176,16 @@ class ReplicaFetcherThreadTest {
val partition = createMock(classOf[Partition]) val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[ReplicaManager]) val replicaManager = createMock(classOf[ReplicaManager])
val leaderEpoch = 5
val initialLEO = 200 val initialLEO = 200
//Stubs //Stubs
expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes() expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(5).anyTimes() expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 1)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch).anyTimes()
expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, initialLEO)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager) stub(replica, partition, replicaManager)
@ -188,20 +193,208 @@ class ReplicaFetcherThreadTest {
replay(leaderEpochs, replicaManager, logManager, quota, replica, partition) replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
//Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
val offsetsReply = Map(t1p0 -> new EpochEndOffset(156), t2p1 -> new EpochEndOffset(172)).asJava val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpoch, 156), t2p1 -> new EpochEndOffset(leaderEpoch, 172)).asJava
//Create the thread //Create the thread
val endPoint = new BrokerEndPoint(0, "localhost", 1000) val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0)) thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
//Run it //Run it
thread.doWork() thread.doWork()
//We should have truncated to the offsets in the response //We should have truncated to the offsets in the response
assertTrue(truncateToCapture.getValues.asScala.contains(156)) assertTrue("Expected " + t1p0 + " to truncate to offset 156 (truncation offsets: " + truncateToCapture.getValues + ")",
assertTrue(truncateToCapture.getValues.asScala.contains(172)) truncateToCapture.getValues.asScala.contains(156))
assertTrue("Expected " + t2p1 + " to truncate to offset 172 (truncation offsets: " + truncateToCapture.getValues + ")",
truncateToCapture.getValues.asScala.contains(172))
}
@Test
def shouldTruncateToOffsetSpecifiedInEpochOffsetResponseIfFollowerHasNoMoreEpochs(): Unit = {
// Create a capture to track what partitions/offsets are truncated
val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
// Setup all the dependencies
val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createMock(classOf[LeaderEpochCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[ReplicaManager])
val leaderEpochAtFollower = 5
val leaderEpochAtLeader = 4
val initialLEO = 200
//Stubs
expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 3)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(leaderEpochAtFollower).anyTimes()
expect(leaderEpochs.endOffsetFor(leaderEpochAtLeader)).andReturn((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager)
replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
//Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
val offsetsReply = Map(t1p0 -> new EpochEndOffset(leaderEpochAtLeader, 156),
t2p1 -> new EpochEndOffset(leaderEpochAtLeader, 202)).asJava
//Create the thread
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
//Run it
thread.doWork()
//We should have truncated to the offsets in the response
assertTrue("Expected " + t1p0 + " to truncate to offset 156 (truncation offsets: " + truncateToCapture.getValues + ")",
truncateToCapture.getValues.asScala.contains(156))
assertTrue("Expected " + t2p1 + " to truncate to offset " + initialLEO +
" (truncation offsets: " + truncateToCapture.getValues + ")",
truncateToCapture.getValues.asScala.contains(initialLEO))
}
@Test
def shouldFetchLeaderEpochSecondTimeIfLeaderRepliesWithEpochNotKnownToFollower(): Unit = {
// Create a capture to track what partitions/offsets are truncated
val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
// Setup all dependencies
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[ReplicaManager])
val initialLEO = 200
// Stubs
expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 2)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(5)
expect(leaderEpochs.endOffsetFor(4)).andReturn((3, 120)).anyTimes()
expect(leaderEpochs.endOffsetFor(3)).andReturn((3, 120)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager)
replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
// Define the offsets for the OffsetsForLeaderEpochResponse
val offsets = Map(t1p0 -> new EpochEndOffset(4, 155), t1p1 -> new EpochEndOffset(4, 143)).asJava
// Create the fetcher thread
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
// Loop 1 -- both topic partitions will need to fetch another leader epoch
thread.doWork()
assertEquals(1, mockNetwork.epochFetchCount)
assertEquals(0, mockNetwork.fetchCount)
// Loop 2 should do the second fetch for both topic partitions because the leader replied with
// epoch 4 while follower knows only about epoch 3
val nextOffsets = Map(t1p0 -> new EpochEndOffset(3, 101), t1p1 -> new EpochEndOffset(3, 102)).asJava
mockNetwork.setOffsetsForNextResponse(nextOffsets)
thread.doWork()
assertEquals(2, mockNetwork.epochFetchCount)
assertEquals(1, mockNetwork.fetchCount)
assertEquals("OffsetsForLeaderEpochRequest version.",
1, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
//Loop 3 we should not fetch epochs
thread.doWork()
assertEquals(2, mockNetwork.epochFetchCount)
assertEquals(2, mockNetwork.fetchCount)
//We should have truncated to the offsets in the second response
assertTrue("Expected " + t1p1 + " to truncate to offset 102 (truncation offsets: " + truncateToCapture.getValues + ")",
truncateToCapture.getValues.asScala.contains(102))
assertTrue("Expected " + t1p0 + " to truncate to offset 101 (truncation offsets: " + truncateToCapture.getValues + ")",
truncateToCapture.getValues.asScala.contains(101))
}
@Test
def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = {
// Create a capture to track what partitions/offsets are truncated
val truncateToCapture: Capture[Long] = newCapture(CaptureType.ALL)
val props = TestUtils.createBrokerConfig(1, "localhost:1234")
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.11.0")
val config = KafkaConfig.fromProps(props)
// Setup all dependencies
val quota = createNiceMock(classOf[ReplicationQuotaManager])
val leaderEpochs = createNiceMock(classOf[LeaderEpochCache])
val logManager = createMock(classOf[LogManager])
val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
val replica = createNiceMock(classOf[Replica])
val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[ReplicaManager])
val initialLEO = 200
// Stubs
expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).anyTimes()
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 2)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(5)
expect(leaderEpochs.endOffsetFor(4)).andReturn((3, 120)).anyTimes()
expect(leaderEpochs.endOffsetFor(3)).andReturn((3, 120)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager)
replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
// Define the offsets for the OffsetsForLeaderEpochResponse with undefined epoch to simulate
// older protocol version
val offsets = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 155), t1p1 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 143)).asJava
// Create the fetcher thread
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsets, brokerEndPoint, new SystemTime())
val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
// Loop 1 -- both topic partitions will truncate to leader offset even though they don't know
// about leader epoch
thread.doWork()
assertEquals(1, mockNetwork.epochFetchCount)
assertEquals(1, mockNetwork.fetchCount)
assertEquals("OffsetsForLeaderEpochRequest version.",
0, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
//Loop 2 we should not fetch epochs
thread.doWork()
assertEquals(1, mockNetwork.epochFetchCount)
assertEquals(2, mockNetwork.fetchCount)
//We should have truncated to the offsets in the first response
assertTrue("Expected " + t1p0 + " to truncate to offset 155 (truncation offsets: " + truncateToCapture.getValues + ")",
truncateToCapture.getValues.asScala.contains(155))
assertTrue("Expected " + t1p1 + " to truncate to offset 143 (truncation offsets: " + truncateToCapture.getValues + ")",
truncateToCapture.getValues.asScala.contains(143))
} }
@Test @Test
@ -227,6 +420,7 @@ class ReplicaFetcherThreadTest {
expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes() expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes()
expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialFetchOffset)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(5) expect(leaderEpochs.latestEpoch).andReturn(5)
expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
@ -234,18 +428,17 @@ class ReplicaFetcherThreadTest {
replay(leaderEpochs, replicaManager, logManager, quota, replica, partition) replay(leaderEpochs, replicaManager, logManager, quota, replica, partition)
//Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
val offsetsReply = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava val offsetsReply = Map(t1p0 -> new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)).asJava
//Create the thread //Create the thread
val endPoint = new BrokerEndPoint(0, "localhost", 1000) val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> initialFetchOffset)) thread.addPartitions(Map(t1p0 -> initialFetchOffset))
//Run it //Run it
thread.doWork() thread.doWork()
//We should have truncated to the highwatermark for partitino 2 only //We should have truncated to initial fetch offset
assertEquals(initialFetchOffset, truncated.getValue) assertEquals(initialFetchOffset, truncated.getValue)
} }
@ -265,6 +458,7 @@ class ReplicaFetcherThreadTest {
val partition = createMock(classOf[Partition]) val partition = createMock(classOf[Partition])
val replicaManager = createMock(classOf[kafka.server.ReplicaManager]) val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
val leaderEpoch = 5
val highWaterMark = 100 val highWaterMark = 100
val initialLeo = 300 val initialLeo = 300
@ -273,7 +467,9 @@ class ReplicaFetcherThreadTest {
expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes() expect(partition.truncateTo(capture(truncated), anyBoolean())).anyTimes()
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLeo)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(5) expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
// this is for the last reply with EpochEndOffset(5, 156)
expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, initialLeo)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager) stub(replica, partition, replicaManager)
@ -281,14 +477,13 @@ class ReplicaFetcherThreadTest {
//Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
val offsetsReply = mutable.Map( val offsetsReply = mutable.Map(
t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH_OFFSET), t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, EpochEndOffset.UNDEFINED_EPOCH_OFFSET) t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
).asJava ).asJava
//Create the thread //Create the thread
val endPoint = new BrokerEndPoint(0, "localhost", 1000) val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
val thread = new ReplicaFetcherThread("bob", 0, endPoint, configs(0), replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0)) thread.addPartitions(Map(t1p0 -> 0, t2p1 -> 0))
//Run thread 3 times //Run thread 3 times
@ -300,7 +495,7 @@ class ReplicaFetcherThreadTest {
assertEquals(0, truncated.getValues.size()) assertEquals(0, truncated.getValues.size())
//New leader elected and replies //New leader elected and replies
offsetsReply.put(t1p0, new EpochEndOffset(156)) offsetsReply.put(t1p0, new EpochEndOffset(leaderEpoch, 156))
thread.doWork() thread.doWork()
@ -321,10 +516,14 @@ class ReplicaFetcherThreadTest {
val partition = createMock(classOf[Partition]) val partition = createMock(classOf[Partition])
val replicaManager = createNiceMock(classOf[ReplicaManager]) val replicaManager = createNiceMock(classOf[ReplicaManager])
val leaderEpoch = 4
//Stub return values //Stub return values
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(0)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(5) expect(replica.highWatermark).andReturn(new LogOffsetMetadata(0)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(leaderEpoch)
expect(leaderEpochs.endOffsetFor(leaderEpoch)).andReturn((leaderEpoch, 0)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager) stub(replica, partition, replicaManager)
@ -333,13 +532,12 @@ class ReplicaFetcherThreadTest {
//Define the offsets for the OffsetsForLeaderEpochResponse //Define the offsets for the OffsetsForLeaderEpochResponse
val offsetsReply = Map( val offsetsReply = Map(
t1p0 -> new EpochEndOffset(1), t1p1 -> new EpochEndOffset(1) t1p0 -> new EpochEndOffset(leaderEpoch, 1), t1p1 -> new EpochEndOffset(leaderEpoch, 1)
).asJava ).asJava
//Create the fetcher thread //Create the fetcher thread
val endPoint = new BrokerEndPoint(0, "localhost", 1000) val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
//When //When
thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
@ -373,7 +571,9 @@ class ReplicaFetcherThreadTest {
expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once
expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes() expect(replica.epochs).andReturn(Some(leaderEpochs)).anyTimes()
expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes() expect(replica.logEndOffset).andReturn(new LogOffsetMetadata(initialLEO)).anyTimes()
expect(replica.highWatermark).andReturn(new LogOffsetMetadata(initialLEO - 2)).anyTimes()
expect(leaderEpochs.latestEpoch).andReturn(5) expect(leaderEpochs.latestEpoch).andReturn(5)
expect(leaderEpochs.endOffsetFor(5)).andReturn((5, initialLEO)).anyTimes()
expect(replicaManager.logManager).andReturn(logManager).anyTimes() expect(replicaManager.logManager).andReturn(logManager).anyTimes()
expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes() expect(replicaManager.replicaAlterLogDirsManager).andReturn(replicaAlterLogDirsManager).anyTimes()
stub(replica, partition, replicaManager) stub(replica, partition, replicaManager)
@ -382,13 +582,12 @@ class ReplicaFetcherThreadTest {
//Define the offsets for the OffsetsForLeaderEpochResponse //Define the offsets for the OffsetsForLeaderEpochResponse
val offsetsReply = Map( val offsetsReply = Map(
t1p0 -> new EpochEndOffset(52), t1p1 -> new EpochEndOffset(49) t1p0 -> new EpochEndOffset(5, 52), t1p1 -> new EpochEndOffset(5, 49)
).asJava ).asJava
//Create the fetcher thread //Create the fetcher thread
val endPoint = new BrokerEndPoint(0, "localhost", 1000) val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, brokerEndPoint, new SystemTime())
val mockNetwork = new ReplicaFetcherMockBlockingSend(offsetsReply, endPoint, new SystemTime()) val thread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
val thread = new ReplicaFetcherThread("bob", 0, endPoint, config, replicaManager, new Metrics(), new SystemTime(), quota, Some(mockNetwork))
//When //When
thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0)) thread.addPartitions(Map(t1p0 -> 0, t1p1 -> 0))
@ -417,4 +616,4 @@ class ReplicaFetcherThreadTest {
expect(replicaManager.getReplicaOrException(t2p1)).andReturn(replica).anyTimes() expect(replicaManager.getReplicaOrException(t2p1)).andReturn(replica).anyTimes()
expect(replicaManager.getPartition(t2p1)).andReturn(Some(partition)).anyTimes() expect(replicaManager.getPartition(t2p1)).andReturn(Some(partition)).anyTimes()
} }
} }

View File

@ -258,7 +258,7 @@ class RequestQuotaTest extends BaseRequestTest {
new InitProducerIdRequest.Builder("abc") new InitProducerIdRequest.Builder("abc")
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
new OffsetsForLeaderEpochRequest.Builder().add(tp, 0) new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()).add(tp, 0)
case ApiKeys.ADD_PARTITIONS_TO_TXN => case ApiKeys.ADD_PARTITIONS_TO_TXN =>
new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, List(tp).asJava) new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, List(tp).asJava)

View File

@ -38,6 +38,7 @@ import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.{ListBuffer => Buffer} import scala.collection.mutable.{ListBuffer => Buffer}
import scala.collection.Seq
/** /**
* These tests were written to assert the addition of leader epochs to the replication protocol fix the problems * These tests were written to assert the addition of leader epochs to the replication protocol fix the problems
@ -298,6 +299,86 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
} }
} }
@Test
def logsShouldNotDivergeOnUncleanLeaderElections(): Unit = {
// Given two brokers, unclean leader election is enabled
brokers = (100 to 101).map(createBroker(_, enableUncleanLeaderElection = true))
// A single partition topic with 2 replicas, min.isr = 1
adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(
topic, Map(0 -> Seq(100, 101)), config = CoreUtils.propsWith((KafkaConfig.MinInSyncReplicasProp, "1"))
)
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
// Write one message while both brokers are up
(0 until 1).foreach { i =>
producer.send(new ProducerRecord(topic, 0, null, msg))
producer.flush()}
// Since we use producer with acks = 1, make sure that logs match for the first epoch
waitForLogsToMatch(brokers(0), brokers(1))
// shutdown broker 100
brokers(0).shutdown()
//Write 1 message
(0 until 1).foreach { i =>
producer.send(new ProducerRecord(topic, 0, null, msg))
producer.flush()}
brokers(1).shutdown()
brokers(0).startup()
//Bounce the producer (this is required, probably because the broker port changes on restart?)
producer.close()
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
//Write 3 messages
(0 until 3).foreach { i =>
producer.send(new ProducerRecord(topic, 0, null, msgBigger))
producer.flush()}
brokers(0).shutdown()
brokers(1).startup()
//Bounce the producer (this is required, probably because the broker port changes on restart?)
producer.close()
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
//Write 1 message
(0 until 1).foreach { i =>
producer.send(new ProducerRecord(topic, 0, null, msg))
producer.flush()}
brokers(1).shutdown()
brokers(0).startup()
//Bounce the producer (this is required, probably because the broker port changes on restart?)
producer.close()
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
//Write 2 messages
(0 until 2).foreach { i =>
producer.send(new ProducerRecord(topic, 0, null, msgBigger))
producer.flush()}
printSegments()
brokers(1).startup()
waitForLogsToMatch(brokers(0), brokers(1))
printSegments()
def crcSeq(broker: KafkaServer, partition: Int = 0): Seq[Long] = {
val batches = getLog(broker, partition).activeSegment.read(0, None, Integer.MAX_VALUE)
.records.batches().asScala.toSeq
batches.map(_.checksum)
}
assertTrue(s"Logs on Broker 100 and Broker 101 should match",
crcSeq(brokers(0)) == crcSeq(brokers(1)))
}
private def log(leader: KafkaServer, follower: KafkaServer): Unit = { private def log(leader: KafkaServer, follower: KafkaServer): Unit = {
info(s"Bounce complete for follower ${follower.config.brokerId}") info(s"Bounce complete for follower ${follower.config.brokerId}")
info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries()) info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries())
@ -389,12 +470,13 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
brokers.filter(_.config.brokerId != leader)(0) brokers.filter(_.config.brokerId != leader)(0)
} }
private def createBroker(id: Int): KafkaServer = { private def createBroker(id: Int, enableUncleanLeaderElection: Boolean = false): KafkaServer = {
val config = createBrokerConfig(id, zkConnect) val config = createBrokerConfig(id, zkConnect)
if(!KIP_101_ENABLED) { if(!KIP_101_ENABLED) {
config.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_0_11_0_IV1.version) config.setProperty(KafkaConfig.InterBrokerProtocolVersionProp, KAFKA_0_11_0_IV1.version)
config.setProperty(KafkaConfig.LogMessageFormatVersionProp, KAFKA_0_11_0_IV1.version) config.setProperty(KafkaConfig.LogMessageFormatVersionProp, KAFKA_0_11_0_IV1.version)
} }
config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, enableUncleanLeaderElection.toString)
createServer(fromProps(config)) createServer(fromProps(config))
} }

View File

@ -50,7 +50,7 @@ class LeaderEpochFileCacheTest {
//Then //Then
assertEquals(2, cache.latestEpoch()) assertEquals(2, cache.latestEpoch())
assertEquals(EpochEntry(2, 10), cache.epochEntries()(0)) assertEquals(EpochEntry(2, 10), cache.epochEntries()(0))
assertEquals(11, cache.endOffsetFor(2)) //should match leo assertEquals((2, leo), cache.endOffsetFor(2)) //should match leo
} }
@Test @Test
@ -67,23 +67,27 @@ class LeaderEpochFileCacheTest {
leo = 14 leo = 14
//Then //Then
assertEquals(14, cache.endOffsetFor(2)) assertEquals((2, leo), cache.endOffsetFor(2))
} }
@Test @Test
def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = { def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = {
def leoFinder() = new LogOffsetMetadata(0) def leoFinder() = new LogOffsetMetadata(0)
val expectedEpochEndOffset = (UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
//Given cache with some data on leader //Given cache with some data on leader
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
// assign couple of epochs
cache.assign(epoch = 2, offset = 11) cache.assign(epoch = 2, offset = 11)
cache.assign(epoch = 3, offset = 12) cache.assign(epoch = 3, offset = 12)
//When (say a bootstraping follower) sends request for UNDEFINED_EPOCH //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH
val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) val epochAndOffsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
//Then //Then
assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor) assertEquals("Expected undefined epoch and offset if undefined epoch requested. Cache not empty.",
expectedEpochEndOffset, epochAndOffsetFor)
} }
@Test @Test
@ -140,7 +144,7 @@ class LeaderEpochFileCacheTest {
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//Then //Then
assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0)) assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(0))
} }
@Test @Test
@ -155,7 +159,8 @@ class LeaderEpochFileCacheTest {
val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH)
//Then //Then
assertEquals(UNDEFINED_EPOCH_OFFSET, offsetFor) assertEquals("Expected undefined epoch and offset if undefined epoch requested. Empty cache.",
(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), offsetFor)
} }
@Test @Test
@ -170,10 +175,10 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 7, offset = 13) cache.assign(epoch = 7, offset = 13)
//When //When
val offset = cache.endOffsetFor(5 - 1) val epochAndOffset = cache.endOffsetFor(5 - 1)
//Then //Then
assertEquals(UNDEFINED_EPOCH_OFFSET, offset) assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), epochAndOffset)
} }
@Test @Test
@ -194,7 +199,7 @@ class LeaderEpochFileCacheTest {
leo = 17 leo = 17
//Then get the start offset of the next epoch //Then get the start offset of the next epoch
assertEquals(15, cache.endOffsetFor(2)) assertEquals((2, 15), cache.endOffsetFor(2))
} }
@Test @Test
@ -210,8 +215,9 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 4, offset = 17) cache.assign(epoch = 4, offset = 17)
//Then //Then
assertEquals(13, cache.endOffsetFor(requestedEpoch = 1)) assertEquals((0, 13), cache.endOffsetFor(requestedEpoch = 1))
assertEquals(17, cache.endOffsetFor(requestedEpoch = 2)) assertEquals((2, 17), cache.endOffsetFor(requestedEpoch = 2))
assertEquals((2, 17), cache.endOffsetFor(requestedEpoch = 3))
} }
@Test @Test
@ -242,7 +248,7 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 2, offset = 100) cache.assign(epoch = 2, offset = 100)
//Then //Then
assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(3)) assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(3))
} }
@Test @Test
@ -258,7 +264,7 @@ class LeaderEpochFileCacheTest {
leo = 7 leo = 7
//Then //Then
assertEquals(leo, cache.endOffsetFor(2)) assertEquals((2, leo), cache.endOffsetFor(2))
assertEquals(1, cache.epochEntries.size) assertEquals(1, cache.epochEntries.size)
assertEquals(EpochEntry(2, 6), cache.epochEntries()(0)) assertEquals(EpochEntry(2, 6), cache.epochEntries()(0))
} }
@ -300,10 +306,10 @@ class LeaderEpochFileCacheTest {
assertEquals(2, cache.latestEpoch()) assertEquals(2, cache.latestEpoch())
//Then end offset for epoch 1 shouldn't have changed //Then end offset for epoch 1 shouldn't have changed
assertEquals(6, cache.endOffsetFor(1)) assertEquals((1, 6), cache.endOffsetFor(1))
//Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't thing of a better option) //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't think of a better option)
assertEquals(8, cache.endOffsetFor(2)) assertEquals((2, 8), cache.endOffsetFor(2))
//Epoch history shouldn't have changed //Epoch history shouldn't have changed
assertEquals(EpochEntry(1, 5), cache.epochEntries()(0)) assertEquals(EpochEntry(1, 5), cache.epochEntries()(0))
@ -340,17 +346,17 @@ class LeaderEpochFileCacheTest {
//Then epoch should go up //Then epoch should go up
assertEquals(1, cache.latestEpoch()) assertEquals(1, cache.latestEpoch())
//offset for 1 should still be 0 //offset for 1 should still be 0
assertEquals(0, cache.endOffsetFor(1)) assertEquals((1, 0), cache.endOffsetFor(1))
//offset for epoch 0 should still be 0 //offset for epoch 0 should still be 0
assertEquals(0, cache.endOffsetFor(0)) assertEquals((0, 0), cache.endOffsetFor(0))
//When we write 5 messages as epoch 1 //When we write 5 messages as epoch 1
leo = 5 leo = 5
//Then end offset for epoch(1) should be leo => 5 //Then end offset for epoch(1) should be leo => 5
assertEquals(5, cache.endOffsetFor(1)) assertEquals((1, 5), cache.endOffsetFor(1))
//Epoch 0 should still be at offset 0 //Epoch 0 should still be at offset 0
assertEquals(0, cache.endOffsetFor(0)) assertEquals((0, 0), cache.endOffsetFor(0))
//When //When
cache.assign(epoch = 2, offset = 5) //leo=5 cache.assign(epoch = 2, offset = 5) //leo=5
@ -358,13 +364,13 @@ class LeaderEpochFileCacheTest {
leo = 10 //write another 5 messages leo = 10 //write another 5 messages
//Then end offset for epoch(2) should be leo => 10 //Then end offset for epoch(2) should be leo => 10
assertEquals(10, cache.endOffsetFor(2)) assertEquals((2, 10), cache.endOffsetFor(2))
//end offset for epoch(1) should be the start offset of epoch(2) => 5 //end offset for epoch(1) should be the start offset of epoch(2) => 5
assertEquals(5, cache.endOffsetFor(1)) assertEquals((1, 5), cache.endOffsetFor(1))
//epoch (0) should still be 0 //epoch (0) should still be 0
assertEquals(0, cache.endOffsetFor(0)) assertEquals((0, 0), cache.endOffsetFor(0))
} }
@Test @Test
@ -382,7 +388,7 @@ class LeaderEpochFileCacheTest {
//Then epoch should stay, offsets should grow //Then epoch should stay, offsets should grow
assertEquals(0, cache.latestEpoch()) assertEquals(0, cache.latestEpoch())
assertEquals(leo, cache.endOffsetFor(0)) assertEquals((0, leo), cache.endOffsetFor(0))
//When messages arrive with greater epoch //When messages arrive with greater epoch
cache.assign(epoch = 1, offset = 3); leo = 4 cache.assign(epoch = 1, offset = 3); leo = 4
@ -390,7 +396,7 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 1, offset = 5); leo = 6 cache.assign(epoch = 1, offset = 5); leo = 6
assertEquals(1, cache.latestEpoch()) assertEquals(1, cache.latestEpoch())
assertEquals(leo, cache.endOffsetFor(1)) assertEquals((1, leo), cache.endOffsetFor(1))
//When //When
cache.assign(epoch = 2, offset = 6); leo = 7 cache.assign(epoch = 2, offset = 6); leo = 7
@ -398,11 +404,11 @@ class LeaderEpochFileCacheTest {
cache.assign(epoch = 2, offset = 8); leo = 9 cache.assign(epoch = 2, offset = 8); leo = 9
assertEquals(2, cache.latestEpoch()) assertEquals(2, cache.latestEpoch())
assertEquals(leo, cache.endOffsetFor(2)) assertEquals((2, leo), cache.endOffsetFor(2))
//Older epochs should return the start offset of the first message in the subsequent epoch. //Older epochs should return the start offset of the first message in the subsequent epoch.
assertEquals(3, cache.endOffsetFor(0)) assertEquals((0, 3), cache.endOffsetFor(0))
assertEquals(6, cache.endOffsetFor(1)) assertEquals((1, 6), cache.endOffsetFor(1))
} }
@Test @Test
@ -648,7 +654,7 @@ class LeaderEpochFileCacheTest {
val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint)
//Then //Then
assertEquals(-1, cache.endOffsetFor(7)) assertEquals((UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), cache.endOffsetFor(7))
} }
@Test @Test

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{LogContext, SystemTime} import org.apache.kafka.common.utils.{LogContext, SystemTime}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{After, Test} import org.junit.{After, Test}
@ -265,7 +266,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
private[epoch] class TestFetcherThread(sender: BlockingSend) extends Logging { private[epoch] class TestFetcherThread(sender: BlockingSend) extends Logging {
def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
val request = new OffsetsForLeaderEpochRequest.Builder(toJavaFormat(partitions)) val request = new OffsetsForLeaderEpochRequest.Builder(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), toJavaFormat(partitions))
val response = sender.sendRequest(request) val response = sender.sendRequest(request)
response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
} }

View File

@ -41,7 +41,7 @@ class OffsetsForLeaderEpochTest {
@Test @Test
def shouldGetEpochsFromReplica(): Unit = { def shouldGetEpochsFromReplica(): Unit = {
//Given //Given
val offset = 42 val epochAndOffset = (5, 42L)
val epochRequested: Integer = 5 val epochRequested: Integer = 5
val request = Map(tp -> epochRequested) val request = Map(tp -> epochRequested)
@ -49,7 +49,7 @@ class OffsetsForLeaderEpochTest {
val mockLog = createNiceMock(classOf[kafka.log.Log]) val mockLog = createNiceMock(classOf[kafka.log.Log])
val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache]) val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache])
val logManager = createNiceMock(classOf[kafka.log.LogManager]) val logManager = createNiceMock(classOf[kafka.log.LogManager])
expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset) expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes() expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes() expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
replay(mockCache, mockLog, logManager) replay(mockCache, mockLog, logManager)
@ -67,7 +67,7 @@ class OffsetsForLeaderEpochTest {
val response = replicaManager.lastOffsetForLeaderEpoch(request) val response = replicaManager.lastOffsetForLeaderEpoch(request)
//Then //Then
assertEquals(new EpochEndOffset(Errors.NONE, offset), response(tp)) assertEquals(new EpochEndOffset(Errors.NONE, epochAndOffset._1, epochAndOffset._2), response(tp))
} }
@Test @Test
@ -90,7 +90,7 @@ class OffsetsForLeaderEpochTest {
val response = replicaManager.lastOffsetForLeaderEpoch(request) val response = replicaManager.lastOffsetForLeaderEpoch(request)
//Then //Then
assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp)) assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
} }
@Test @Test
@ -112,6 +112,6 @@ class OffsetsForLeaderEpochTest {
val response = replicaManager.lastOffsetForLeaderEpoch(request) val response = replicaManager.lastOffsetForLeaderEpoch(request)
//Then //Then
assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH_OFFSET), response(tp)) assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
} }
} }

View File

@ -28,17 +28,28 @@ import org.apache.kafka.common.utils.{SystemTime, Time}
/** /**
* Stub network client used for testing the ReplicaFetcher, wraps the MockClient used for consumer testing * Stub network client used for testing the ReplicaFetcher, wraps the MockClient used for consumer testing
*
* The common case is that there is only one OFFSET_FOR_LEADER_EPOCH request/response. So, the
* response to OFFSET_FOR_LEADER_EPOCH is 'offsets' map. If the test needs to set another round of
* OFFSET_FOR_LEADER_EPOCH with different offsets in response, it should update offsets using
* setOffsetsForNextResponse
*/ */
class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend { class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend {
private val client = new MockClient(new SystemTime) private val client = new MockClient(new SystemTime)
var fetchCount = 0 var fetchCount = 0
var epochFetchCount = 0 var epochFetchCount = 0
var lastUsedOffsetForLeaderEpochVersion = -1
var callback: Option[() => Unit] = None var callback: Option[() => Unit] = None
var currentOffsets: java.util.Map[TopicPartition, EpochEndOffset] = offsets
def setEpochRequestCallback(postEpochFunction: () => Unit){ def setEpochRequestCallback(postEpochFunction: () => Unit){
callback = Some(postEpochFunction) callback = Some(postEpochFunction)
} }
def setOffsetsForNextResponse(newOffsets: java.util.Map[TopicPartition, EpochEndOffset]): Unit = {
currentOffsets = newOffsets
}
override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = { override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
//Send the request to the mock client //Send the request to the mock client
@ -50,7 +61,8 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
callback.foreach(_.apply()) callback.foreach(_.apply())
epochFetchCount += 1 epochFetchCount += 1
new OffsetsForLeaderEpochResponse(offsets) lastUsedOffsetForLeaderEpochVersion = requestBuilder.latestAllowedVersion()
new OffsetsForLeaderEpochResponse(currentOffsets)
case ApiKeys.FETCH => case ApiKeys.FETCH =>
fetchCount += 1 fetchCount += 1
@ -75,4 +87,4 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
} }
override def close(): Unit = {} override def close(): Unit = {}
} }

View File

@ -19,6 +19,58 @@
<script id="upgrade-template" type="text/x-handlebars-template"> <script id="upgrade-template" type="text/x-handlebars-template">
<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 1.2.x to 2.0.0</a></h4>
<p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_200_notable">notable changes in 2.0.0</a> before upgrading.
</p>
<p><b>For a rolling upgrade:</b></p>
<ol>
<li> Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 0.10.0, 0.10.1, 0.10.2, 0.11.0, 1.0, 1.1, 1.2).</li>
<li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See <a href="#upgrade_10_performance_impact">potential performance impact
following the upgrade</a> for the details on what this configuration does.)</li>
</ul>
If you are upgrading from 0.11.0.x, 1.0.x, 1.1.x or 1.2.x and you have not overridden the message format, then you only need to override
the inter-broker protocol format.
<ul>
<li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (0.11.0, 1.0, 1.1, 1.2).</li>
</ul>
</li>
<li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. </li>
<li> Once the entire cluster is upgraded, bump the protocol version by editing <code>inter.broker.protocol.version</code> and setting it to 2.0.
<li> Restart the brokers one by one for the new protocol version to take effect.</li>
<li> If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
change log.message.format.version to 2.0 on each broker and restart them one by one. Note that the older Scala consumer
does not support the new message format introduced in 0.11, so to avoid the performance cost of down-conversion (or to
take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once semantics</a>), the newer Java consumer must be used.</li>
</ol>
<p><b>Additional Upgrade Notes:</b></p>
<ol>
<li>If you are willing to accept downtime, you can simply take all the brokers down, update the code and start them back up. They will start
with the new protocol by default.</li>
<li>Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
Similarly for the message format version.</li>
<li>If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties.
Hot-swaping the jar-file only might not work.</li>
</ol>
<h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2.0.0</a></h5>
<ul>
</ul>
<h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>
<ul>
<li> <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over">KIP-279</a>: OffsetsForLeaderEpochResponse v1 introduces a partition-level <code>leader_epoch</code> field. </li>
</ul>
<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.x</a></h4> <h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.x</a></h4>
<p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, <p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,