KAFKA-8443; Broker support for fetch from followers (#6832)

Follow on to #6731, this PR adds broker-side support for [KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica) (fetch from followers). 

Changes:
* All brokers will handle FetchRequest regardless of leadership
* Leaders can compute a preferred replica to return to the client
* New ReplicaSelector interface for determining the preferred replica
* Incremental fetches will include partitions with no records if the preferred replica has been computed
* Adds new JMX to expose the current preferred read replica of a partition in the consumer

Two new conditions were added for completing a delayed fetch. They both relate to communicating the high watermark to followers without waiting for a timeout:
* For regular fetches, if the high watermark changes within a single fetch request 
* For incremental fetch sessions, if the follower's high watermark is lower than the leader

A new JMX attribute `preferred-read-replica` was added to the `kafka.consumer:type=consumer-fetch-manager-metrics,client-id=some-consumer,topic=my-topic,partition=0` object. This was added to support the new system test which verifies that the fetch from follower behavior works end-to-end. This attribute could also be useful in the future when debugging problems with the consumer.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Arthur 2019-07-04 11:18:51 -04:00 committed by Jason Gustafson
parent 41e1c13a52
commit 23beeea34b
34 changed files with 1235 additions and 133 deletions

View File

@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@ -42,6 +43,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@ -1706,8 +1708,20 @@ public class Fetcher<K, V> implements Closeable {
if (!newAssignedPartitions.contains(tp)) {
metrics.removeSensor(partitionLagMetricName(tp));
metrics.removeSensor(partitionLeadMetricName(tp));
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
}
}
for (TopicPartition tp : newAssignedPartitions) {
if (!this.assignedPartitions.contains(tp)) {
MetricName metricName = partitionPreferredReadReplicaMetricName(tp);
if (metrics.metric(metricName) == null) {
metrics.addMetric(metricName, (Gauge<Integer>) (config, now) ->
subscription.preferredReadReplica(tp, 0L).orElse(-1));
}
}
}
this.assignedPartitions = newAssignedPartitions;
this.assignmentId = newAssignmentId;
}
@ -1719,9 +1733,7 @@ public class Fetcher<K, V> implements Closeable {
String name = partitionLeadMetricName(tp);
Sensor recordsLead = this.metrics.getSensor(name);
if (recordsLead == null) {
Map<String, String> metricTags = new HashMap<>(2);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
Map<String, String> metricTags = topicPartitionTags(tp);
recordsLead = this.metrics.sensor(name);
@ -1738,10 +1750,7 @@ public class Fetcher<K, V> implements Closeable {
String name = partitionLagMetricName(tp);
Sensor recordsLag = this.metrics.getSensor(name);
if (recordsLag == null) {
Map<String, String> metricTags = new HashMap<>(2);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
Map<String, String> metricTags = topicPartitionTags(tp);
recordsLag = this.metrics.sensor(name);
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value());
@ -1759,6 +1768,17 @@ public class Fetcher<K, V> implements Closeable {
return tp + ".records-lead";
}
private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) {
Map<String, String> metricTags = topicPartitionTags(tp);
return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags);
}
private Map<String, String> topicPartitionTags(TopicPartition tp) {
Map<String, String> metricTags = new HashMap<>(2);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
return metricTags;
}
}
@Override

View File

@ -54,6 +54,7 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate partitionRecordsLead;
public MetricNameTemplate partitionRecordsLeadMin;
public MetricNameTemplate partitionRecordsLeadAvg;
public MetricNameTemplate partitionPreferredReadReplica;
public FetcherMetricsRegistry() {
this(new HashSet<String>(), "");
@ -139,7 +140,9 @@ public class FetcherMetricsRegistry {
"The min lead of the partition", partitionTags);
this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
"The average lead of the partition", partitionTags);
this.partitionPreferredReadReplica = new MetricNameTemplate(
"preferred-read-replica", "consumer-fetch-manager-metrics",
"The current read replica for the partition, or -1 if reading from leader", partitionTags);
}
public List<MetricNameTemplate> getAllTemplates() {
@ -171,7 +174,8 @@ public class FetcherMetricsRegistry {
partitionRecordsLagMax,
partitionRecordsLead,
partitionRecordsLeadMin,
partitionRecordsLeadAvg
partitionRecordsLeadAvg,
partitionPreferredReadReplica
);
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.net.InetAddress;
import java.util.Objects;
/**
* Holder for all the client metadata required to determine a preferred replica.
*/
public interface ClientMetadata {
/**
* Rack ID sent by the client
*/
String rackId();
/**
* Client ID sent by the client
*/
String clientId();
/**
* Incoming address of the client
*/
InetAddress clientAddress();
/**
* Security principal of the client
*/
KafkaPrincipal principal();
/**
* Listener name for the client
*/
String listenerName();
class DefaultClientMetadata implements ClientMetadata {
private final String rackId;
private final String clientId;
private final InetAddress clientAddress;
private final KafkaPrincipal principal;
private final String listenerName;
public DefaultClientMetadata(String rackId, String clientId, InetAddress clientAddress,
KafkaPrincipal principal, String listenerName) {
this.rackId = rackId;
this.clientId = clientId;
this.clientAddress = clientAddress;
this.principal = principal;
this.listenerName = listenerName;
}
@Override
public String rackId() {
return rackId;
}
@Override
public String clientId() {
return clientId;
}
@Override
public InetAddress clientAddress() {
return clientAddress;
}
@Override
public KafkaPrincipal principal() {
return principal;
}
@Override
public String listenerName() {
return listenerName;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultClientMetadata that = (DefaultClientMetadata) o;
return Objects.equals(rackId, that.rackId) &&
Objects.equals(clientId, that.clientId) &&
Objects.equals(clientAddress, that.clientAddress) &&
Objects.equals(principal, that.principal) &&
Objects.equals(listenerName, that.listenerName);
}
@Override
public int hashCode() {
return Objects.hash(rackId, clientId, clientAddress, principal, listenerName);
}
@Override
public String toString() {
return "DefaultClientMetadata{" +
"rackId='" + rackId + '\'' +
", clientId='" + clientId + '\'' +
", clientAddress=" + clientAddress +
", principal=" + principal +
", listenerName='" + listenerName + '\'' +
'}';
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
/**
* View of a partition used by {@link ReplicaSelector} to determine a preferred replica.
*/
public interface PartitionView {
Set<ReplicaView> replicas();
ReplicaView leader();
class DefaultPartitionView implements PartitionView {
private final Set<ReplicaView> replicas;
private final ReplicaView leader;
public DefaultPartitionView(Set<ReplicaView> replicas, ReplicaView leader) {
this.replicas = Collections.unmodifiableSet(replicas);
this.leader = leader;
}
@Override
public Set<ReplicaView> replicas() {
return replicas;
}
@Override
public ReplicaView leader() {
return leader;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultPartitionView that = (DefaultPartitionView) o;
return Objects.equals(replicas, that.replicas) &&
Objects.equals(leader, that.leader);
}
@Override
public int hashCode() {
return Objects.hash(replicas, leader);
}
@Override
public String toString() {
return "DefaultPartitionView{" +
"replicas=" + replicas +
", leader=" + leader +
'}';
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.TopicPartition;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Returns a replica whose rack id is equal to the rack id specified in the client request metadata. If no such replica
* is found, returns the leader.
*/
public class RackAwareReplicaSelector implements ReplicaSelector {
@Override
public Optional<ReplicaView> select(TopicPartition topicPartition,
ClientMetadata clientMetadata,
PartitionView partitionView) {
if (clientMetadata.rackId() != null && !clientMetadata.rackId().isEmpty()) {
Set<ReplicaView> sameRackReplicas = partitionView.replicas().stream()
.filter(replicaInfo -> clientMetadata.rackId().equals(replicaInfo.endpoint().rack()))
.collect(Collectors.toSet());
if (sameRackReplicas.isEmpty()) {
return Optional.of(partitionView.leader());
} else {
if (sameRackReplicas.contains(partitionView.leader())) {
// Use the leader if it's in this rack
return Optional.of(partitionView.leader());
} else {
// Otherwise, get the most caught-up replica
return sameRackReplicas.stream().max(ReplicaView.comparator());
}
}
} else {
return Optional.of(partitionView.leader());
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
/**
* Plug-able interface for selecting a preferred read replica given the current set of replicas for a partition
* and metadata from the client.
*/
public interface ReplicaSelector extends Configurable, Closeable {
/**
* Select the preferred replica a client should use for fetching. If no replica is available, this will return an
* empty optional.
*/
Optional<ReplicaView> select(TopicPartition topicPartition,
ClientMetadata clientMetadata,
PartitionView partitionView);
@Override
default void close() throws IOException {
// No-op by default
}
@Override
default void configure(Map<String, ?> configs) {
// No-op by default
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.Node;
import java.util.Comparator;
import java.util.Objects;
/**
* View of a replica used by {@link ReplicaSelector} to determine a preferred replica.
*/
public interface ReplicaView {
/**
* The endpoint information for this replica (hostname, port, rack, etc)
*/
Node endpoint();
/**
* The log end offset for this replica
*/
long logEndOffset();
/**
* The number of milliseconds (if any) since the last time this replica was caught up to the high watermark.
* For a leader replica, this is always zero.
*/
long timeSinceLastCaughtUpMs();
/**
* Comparator for ReplicaView that returns in the order of "most caught up". This is used for deterministic
* selection of a replica when there is a tie from a selector.
*/
static Comparator<ReplicaView> comparator() {
return Comparator.comparingLong(ReplicaView::logEndOffset)
.thenComparing(Comparator.comparingLong(ReplicaView::timeSinceLastCaughtUpMs).reversed())
.thenComparing(replicaInfo -> replicaInfo.endpoint().id());
}
class DefaultReplicaView implements ReplicaView {
private final Node endpoint;
private final long logEndOffset;
private final long timeSinceLastCaughtUpMs;
public DefaultReplicaView(Node endpoint, long logEndOffset, long timeSinceLastCaughtUpMs) {
this.endpoint = endpoint;
this.logEndOffset = logEndOffset;
this.timeSinceLastCaughtUpMs = timeSinceLastCaughtUpMs;
}
@Override
public Node endpoint() {
return endpoint;
}
@Override
public long logEndOffset() {
return logEndOffset;
}
@Override
public long timeSinceLastCaughtUpMs() {
return timeSinceLastCaughtUpMs;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultReplicaView that = (DefaultReplicaView) o;
return logEndOffset == that.logEndOffset &&
Objects.equals(endpoint, that.endpoint) &&
Objects.equals(timeSinceLastCaughtUpMs, that.timeSinceLastCaughtUpMs);
}
@Override
public int hashCode() {
return Objects.hash(endpoint, logEndOffset, timeSinceLastCaughtUpMs);
}
@Override
public String toString() {
return "DefaultReplicaView{" +
"endpoint=" + endpoint +
", logEndOffset=" + logEndOffset +
", timeSinceLastCaughtUpMs=" + timeSinceLastCaughtUpMs +
'}';
}
}
}

View File

@ -195,6 +195,7 @@ public class FetchRequest extends AbstractRequest {
// V10 bumped up to indicate ZStandard capability. (see KIP-110)
private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9;
// V11 added rack ID to support read from followers (KIP-392)
private static final Schema FETCH_REQUEST_V11 = new Schema(
REPLICA_ID,
MAX_WAIT_TIME,

View File

@ -144,6 +144,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
LOG_START_OFFSET,
new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
// Introduced in V11 to support read from followers (KIP-392)
private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V6 = new Schema(
PARTITION_ID,
ERROR_CODE,
@ -207,6 +208,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
// V10 bumped up to indicate ZStandard capability. (see KIP-110)
private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9;
// V11 added preferred read replica for each partition response to support read from followers (KIP-392)
private static final Schema FETCH_RESPONSE_V11 = new Schema(
THROTTLE_TIME_MS,
ERROR_CODE,
@ -329,7 +331,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
result = 31 * result + Long.hashCode(highWatermark);
result = 31 * result + Long.hashCode(lastStableOffset);
result = 31 * result + Long.hashCode(logStartOffset);
result = 31 * result + (preferredReadReplica != null ? preferredReadReplica.hashCode() : 0);
result = 31 * result + Objects.hashCode(preferredReadReplica);
result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
result = 31 * result + (records != null ? records.hashCode() : 0);
return result;

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.junit.Test;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.Assert.assertEquals;
public class ReplicaSelectorTest {
@Test
public void testSameRackSelector() {
TopicPartition tp = new TopicPartition("test", 0);
List<ReplicaView> replicaViewSet = replicaInfoSet();
ReplicaView leader = replicaViewSet.get(0);
PartitionView partitionView = partitionInfo(new HashSet<>(replicaViewSet), leader);
ReplicaSelector selector = new RackAwareReplicaSelector();
Optional<ReplicaView> selected = selector.select(tp, metadata("rack-b"), partitionView);
assertOptional(selected, replicaInfo -> {
assertEquals("Expect replica to be in rack-b", replicaInfo.endpoint().rack(), "rack-b");
assertEquals("Expected replica 3 since it is more caught-up", replicaInfo.endpoint().id(), 3);
});
selected = selector.select(tp, metadata("not-a-rack"), partitionView);
assertOptional(selected, replicaInfo -> {
assertEquals("Expect leader when we can't find any nodes in given rack", replicaInfo, leader);
});
selected = selector.select(tp, metadata("rack-a"), partitionView);
assertOptional(selected, replicaInfo -> {
assertEquals("Expect replica to be in rack-a", replicaInfo.endpoint().rack(), "rack-a");
assertEquals("Expect the leader since it's in rack-a", replicaInfo, leader);
});
}
static List<ReplicaView> replicaInfoSet() {
return Stream.of(
replicaInfo(new Node(0, "host0", 1234, "rack-a"), 4, 0),
replicaInfo(new Node(1, "host1", 1234, "rack-a"), 2, 5),
replicaInfo(new Node(2, "host2", 1234, "rack-b"), 3, 3),
replicaInfo(new Node(3, "host3", 1234, "rack-b"), 4, 2)
).collect(Collectors.toList());
}
static ReplicaView replicaInfo(Node node, long logOffset, long timeSinceLastCaughtUpMs) {
return new ReplicaView.DefaultReplicaView(node, logOffset, timeSinceLastCaughtUpMs);
}
static PartitionView partitionInfo(Set<ReplicaView> replicaViewSet, ReplicaView leader) {
return new PartitionView.DefaultPartitionView(replicaViewSet, leader);
}
static ClientMetadata metadata(String rack) {
return new ClientMetadata.DefaultClientMetadata(rack, "test-client",
InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, "TEST");
}
}

View File

@ -519,16 +519,18 @@ class Partition(val topicPartition: TopicPartition,
if (isNewLeader) {
// construct the high watermark metadata for the new leader replica
leaderLog.maybeFetchHighWatermarkOffsetMetadata()
leaderLog.initializeHighWatermarkOffsetMetadata()
// mark local replica as the leader after converting hw
leaderReplicaIdOpt = Some(localBrokerId)
// reset log end offset for remote replicas
remoteReplicas.foreach { _.updateFetchState(
remoteReplicas.foreach { replica =>
replica.updateFetchState(
followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
followerStartOffset = Log.UnknownOffset,
followerFetchTimeMs = 0L,
leaderEndOffset = Log.UnknownOffset
)
replica.updateLastSentHighWatermark(0L)
}
}
// we may need to increment high watermark since ISR could be down to 1

View File

@ -42,6 +42,10 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
// the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
@volatile private[this] var _lastCaughtUpTimeMs = 0L
// highWatermark is the leader's high watermark after the most recent FetchRequest from this follower. This is
// used to determine the maximum HW this follower knows about. See KIP-392
@volatile private[this] var _lastSentHighWatermark = 0L
def logStartOffset: Long = _logStartOffset
def logEndOffsetMetadata: LogOffsetMetadata = _logEndOffsetMetadata
@ -50,6 +54,8 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
def lastSentHighWatermark: Long = _lastSentHighWatermark
/*
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
@ -78,6 +84,19 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
trace(s"Updated state of replica to $this")
}
/**
* Update the high watermark of this remote replica. This is used to track what we think is the last known HW to
* a remote follower. Since this is recorded when we send a response, there is no way to guarantee that the follower
* actually receives this HW. So we consider this to be an upper bound on what the follower knows.
*
* When handling fetches, the last sent high watermark for a replica is checked to see if we should return immediately
* in order to propagate the HW more expeditiously. See KIP-392
*/
def updateLastSentHighWatermark(highWatermark: Long): Unit = {
_lastSentHighWatermark = highWatermark
trace(s"Updated HW of replica to $highWatermark")
}
def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = {
lastFetchLeaderLogEndOffset = curLeaderLogEndOffset
lastFetchTimeMs = curTimeMs
@ -96,6 +115,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
replicaString.append(s", logEndOffsetMetadata=$logEndOffsetMetadata")
replicaString.append(s", lastFetchLeaderLogEndOffset=$lastFetchLeaderLogEndOffset")
replicaString.append(s", lastFetchTimeMs=$lastFetchTimeMs")
replicaString.append(s", lastSentHighWatermark=$lastSentHighWatermark")
replicaString.append(")")
replicaString.toString
}

View File

@ -328,12 +328,14 @@ class Log(@volatile var dir: File,
* Convert hw to local offset metadata by reading the log at the hw offset.
* If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
*/
def maybeFetchHighWatermarkOffsetMetadata(): Unit = {
def initializeHighWatermarkOffsetMetadata(): Unit = {
if (highWatermarkMetadata.messageOffsetOnly) {
highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse {
convertToOffsetMetadata(logStartOffset).getOrElse {
val firstSegmentOffset = logSegments.head.baseOffset
LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0)
lock.synchronized {
highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse {
convertToOffsetMetadata(logStartOffset).getOrElse {
val firstSegmentOffset = logSegments.head.baseOffset
LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0)
}
}
}
}
@ -357,12 +359,40 @@ class Log(@volatile var dir: File,
def lastStableOffsetLag: Long = highWatermark - lastStableOffset
/**
* Fully materialize and return an offset snapshot including segment position info. This method will update
* the LogOffsetMetadata for the high watermark and last stable offset if they are message-only. Throws an
* offset out of range error if the segment info cannot be loaded.
*/
def offsetSnapshot: LogOffsetSnapshot = {
var highWatermark = _highWatermarkMetadata
if (highWatermark.messageOffsetOnly) {
lock.synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(_highWatermarkMetadata.messageOffset)
_highWatermarkMetadata = fullOffset
highWatermark = _highWatermarkMetadata
}
}
var lastStable: LogOffsetMetadata = lastStableOffsetMetadata
if (lastStable.messageOffsetOnly) {
lock synchronized {
firstUnstableOffset match {
case None => highWatermark
case Some(offsetMetadata) =>
val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
firstUnstableOffset = Some(fullOffset)
lastStable = fullOffset
}
}
}
LogOffsetSnapshot(
logStartOffset = logStartOffset,
logEndOffset = logEndOffsetMetadata,
highWatermark = highWatermarkMetadata,
lastStableOffset = lastStableOffsetMetadata)
logStartOffset,
logEndOffsetMetadata,
highWatermark,
lastStable
)
}
private val tags = {
@ -1534,17 +1564,25 @@ class Log(@volatile var dir: File,
*/
def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
try {
val fetchDataInfo = read(offset,
maxLength = 1,
maxOffset = None,
minOneMessage = false,
includeAbortedTxns = false)
Some(fetchDataInfo.fetchOffsetMetadata)
Some(convertToOffsetMetadataOrThrow(offset))
} catch {
case _: OffsetOutOfRangeException => None
}
}
/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
*/
def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
val fetchDataInfo = read(offset,
maxLength = 1,
maxOffset = None,
minOneMessage = false,
includeAbortedTxns = false)
fetchDataInfo.fetchOffsetMetadata
}
/**
* Delete any log segments matching the given predicate function,
* starting with the oldest segment and moving forward until a segment doesn't match.

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import scala.collection._
@ -59,6 +60,7 @@ class DelayedFetch(delayMs: Long,
fetchMetadata: FetchMetadata,
replicaManager: ReplicaManager,
quota: ReplicaQuota,
clientMetadata: Option[ClientMetadata],
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(delayMs) {
@ -71,7 +73,7 @@ class DelayedFetch(delayMs: Long,
* Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case E: The partition is in an offline log directory on this broker
* Case F: This broker is the leader, but the requested epoch is now fenced
*
* Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Upon completion, should return whatever data is available for each valid partition
*/
override def tryComplete(): Boolean = {
@ -114,6 +116,14 @@ class DelayedFetch(delayMs: Long,
accumulatedSize += bytesAvailable
}
}
if (fetchMetadata.isFromFollower) {
// Case G check if the follower has the latest HW from the leader
if (partition.getReplica(fetchMetadata.replicaId)
.exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
return forceComplete()
}
}
}
} catch {
case _: KafkaStorageException => // Case E
@ -157,11 +167,12 @@ class DelayedFetch(delayMs: Long,
fetchMaxBytes = fetchMetadata.fetchMaxBytes,
hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
clientMetadata = clientMetadata,
quota = quota)
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions)
result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
}
responseCallback(fetchPartitionData)

View File

@ -144,6 +144,10 @@ class CachedPartition(val topic: String,
if (updateResponseData)
localLogStartOffset = respData.logStartOffset
}
if (respData.preferredReadReplica.isPresent) {
// If the broker computed a preferred read replica, we need to include it in the response
mustRespond = true
}
if (respData.error.code != 0) {
// Partitions with errors are always included in the response.
// We also set the cached highWatermark to an invalid offset, -1.

View File

@ -69,6 +69,8 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
@ -82,11 +84,13 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition}
import scala.compat.java8.OptionConverters._
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
/**
* Logic to handle the various Kafka requests
*/
@ -577,6 +581,18 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchRequest.toForget,
fetchRequest.isFromFollower)
val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
// Fetch API version 11 added preferred replica logic
Some(new DefaultClientMetadata(
fetchRequest.rackId,
clientId,
request.context.clientAddress,
request.context.principal,
request.context.listenerName.value))
} else {
None
}
def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
@ -650,7 +666,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
// client.
new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
partitionData.lastStableOffset, partitionData.logStartOffset,
partitionData.preferredReadReplica, partitionData.abortedTransactions,
new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
} catch {
case e: UnsupportedCompressionTypeException =>
@ -659,7 +676,8 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions,
partitionData.lastStableOffset, partitionData.logStartOffset,
partitionData.preferredReadReplica, partitionData.abortedTransactions,
unconvertedRecords)
}
}
@ -672,7 +690,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
data.logStartOffset, abortedTransactions, data.records))
data.logStartOffset, data.preferredReadReplica.map(int2Integer).asJava,
abortedTransactions, data.records))
}
erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
@ -769,7 +788,8 @@ class KafkaApis(val requestChannel: RequestChannel,
interesting,
replicationQuota(fetchRequest),
processResponseCallback,
fetchRequest.isolationLevel)
fetchRequest.isolationLevel,
clientMetadata)
}
}

View File

@ -370,6 +370,7 @@ object KafkaConfig {
val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
val InterBrokerListenerNameProp = "inter.broker.listener.name"
val ReplicaSelectorClassProp = "replica.selector.class"
/** ********* Controlled shutdown configuration ***********/
val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
@ -699,6 +700,7 @@ object KafkaConfig {
" Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list."
val InterBrokerListenerNameDoc = s"Name of listener used for communication between brokers. If this is unset, the listener name is defined by $InterBrokerSecurityProtocolProp. " +
s"It is an error to set this and $InterBrokerSecurityProtocolProp properties at the same time."
val ReplicaSelectorClassDoc = "The fully qualified class name that implements ReplicaSelector. This is used by the broker to find the preferred read replica. By default, we use an implementation that returns the leader."
/** ********* Controlled shutdown configuration ***********/
val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens"
val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying."
@ -966,6 +968,7 @@ object KafkaConfig {
.define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
.define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc)
.define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc)
.define(ReplicaSelectorClassProp, STRING, null, MEDIUM, ReplicaSelectorClassDoc)
/** ********* Controlled shutdown configuration ***********/
.define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
@ -1186,6 +1189,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/***************** rack configuration **************/
val rack = Option(getString(KafkaConfig.RackProp))
val replicaSelectorClassName = Option(getString(KafkaConfig.ReplicaSelectorClassProp))
/** ********* Log Configuration ***********/
val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
/**
* A cache for the state (e.g., current leader) of each partition. This cache is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
@ -195,6 +196,24 @@ class MetadataCache(brokerId: Int) extends Logging {
}
}
def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
val snapshot = metadataSnapshot
snapshot.partitionStates.get(tp.topic()).flatMap(_.get(tp.partition())).map { partitionInfo =>
val replicaIds = partitionInfo.basePartitionState.replicas
replicaIds.asScala
.map(replicaId => replicaId.intValue() -> {
snapshot.aliveBrokers.get(replicaId.longValue()) match {
case Some(broker) =>
broker.getNode(listenerName).getOrElse(Node.noNode())
case None =>
Node.noNode()
}}).toMap
.filter(pair => pair match {
case (_, node) => !node.isEmpty
})
}.getOrElse(Map.empty[Int, Node])
}
def getControllerId: Option[Int] = metadataSnapshot.controllerId
def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {

View File

@ -22,7 +22,7 @@ import java.util.Optional
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.{LogAppendInfo, LogOffsetSnapshot}
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition
@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable}
@ -89,7 +89,8 @@ class ReplicaAlterLogDirsThread(name: String,
request.fetchData.asScala.toSeq,
UnboundedQuota,
processResponseCallback,
request.isolationLevel)
request.isolationLevel,
None)
if (partitionData == null)
throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
@ -121,18 +122,13 @@ class ReplicaAlterLogDirsThread(name: String,
}
override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
offsetSnapshot.logStartOffset
val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
partition.localLogOrException.logStartOffset
}
override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
offsetSnapshot.logEndOffset.messageOffset
}
private def offsetSnapshotFromCurrentReplica(topicPartition: TopicPartition, leaderEpoch: Int): LogOffsetSnapshot = {
val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
partition.fetchOffsetSnapshot(Optional.of[Integer](leaderEpoch), fetchOnlyFromLeader = false)
partition.localLogOrException.logEndOffset
}
/**

View File

@ -34,12 +34,15 @@ import kafka.utils._
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView
import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchRequest.PartitionData
@ -47,7 +50,10 @@ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
import org.apache.kafka.common.replica.{ClientMetadata, _}
import scala.compat.java8.OptionConverters._
import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable}
@ -75,7 +81,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
* @param readSize amount of data that was read from the log i.e. size of the fetch
* @param isReadFromLogEnd true if the request read up to the log end offset snapshot
* when the read was initiated, false otherwise
* @param error Exception if error encountered while reading from the log
* @param preferredReadReplica the preferred read replica to be used for future fetches
* @param exception Exception if error encountered while reading from the log
*/
case class LogReadResult(info: FetchDataInfo,
highWatermark: Long,
@ -85,6 +92,8 @@ case class LogReadResult(info: FetchDataInfo,
fetchTimeMs: Long,
readSize: Int,
lastStableOffset: Option[Long],
preferredReadReplica: Option[Int] = None,
followerNeedsHwUpdate: Boolean = false,
exception: Option[Throwable] = None) {
def error: Errors = exception match {
@ -106,7 +115,8 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
logStartOffset: Long,
records: Records,
lastStableOffset: Option[Long],
abortedTransactions: Option[List[AbortedTransaction]])
abortedTransactions: Option[List[AbortedTransaction]],
preferredReadReplica: Option[Int])
/**
@ -216,6 +226,8 @@ class ReplicaManager(val config: KafkaConfig,
}
}
val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector()
val leaderCount = newGauge(
"LeaderCount",
new Gauge[Int] {
@ -807,8 +819,9 @@ class ReplicaManager(val config: KafkaConfig,
}
/**
* Fetch messages from the leader replica, and wait until enough data can be fetched and return;
* the callback function will be triggered either when timeout or required fetch info is satisfied
* Fetch messages from a replica, and wait until enough data can be fetched and return;
* the callback function will be triggered either when timeout or required fetch info is satisfied.
* Consumers may fetch from any replica, but followers can only fetch from the leader.
*/
def fetchMessages(timeout: Long,
replicaId: Int,
@ -818,9 +831,9 @@ class ReplicaManager(val config: KafkaConfig,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel) {
isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]) {
val isFromFollower = Request.isValidBrokerId(replicaId)
val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId
val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)
FetchLogEnd
@ -829,16 +842,16 @@ class ReplicaManager(val config: KafkaConfig,
else
FetchHighWatermark
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
val result = readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = fetchOnlyFromLeader,
fetchOnlyFromLeader = isFromFollower,
fetchIsolation = fetchIsolation,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota)
quota = quota,
clientMetadata = clientMetadata)
if (isFromFollower) updateFollowerFetchState(replicaId, result)
else result
}
@ -849,23 +862,37 @@ class ReplicaManager(val config: KafkaConfig,
var bytesReadable: Long = 0
var errorReadingData = false
val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
var anyPartitionsNeedHwUpdate = false
logReadResults.foreach { case (topicPartition, logReadResult) =>
if (logReadResult.error != Errors.NONE)
errorReadingData = true
bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
logReadResultMap.put(topicPartition, logReadResult)
if (isFromFollower && logReadResult.followerNeedsHwUpdate) {
anyPartitionsNeedHwUpdate = true
}
}
// Wrap the given callback function with another function that will update the HW for the remote follower
val updateHwAndThenCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit =
(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]) => {
fetchPartitionData.foreach {
case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark)
}
responseCallback(fetchPartitionData)
}
// respond immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
// 5) all the requested partitions need HW update
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions)
result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
}
responseCallback(fetchPartitionData)
updateHwAndThenCallback(fetchPartitionData)
} else {
// construct the fetch results from the read results
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
@ -875,9 +902,10 @@ class ReplicaManager(val config: KafkaConfig,
fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
})
}
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower,
fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
updateHwAndThenCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
@ -898,7 +926,8 @@ class ReplicaManager(val config: KafkaConfig,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {
quota: ReplicaQuota,
clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.fetchOffset
@ -917,35 +946,64 @@ class ReplicaManager(val config: KafkaConfig,
val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
val fetchTimeMs = time.milliseconds
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
val readInfo = partition.readRecords(
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchIsolation = fetchIsolation,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
// If we are the leader, determine the preferred read-replica
val preferredReadReplica = clientMetadata.flatMap(
metadata => findPreferredReadReplica(tp, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))
val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {
// If the partition is being throttled, simply return an empty set.
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
if (preferredReadReplica.isDefined) {
replicaSelectorOpt.foreach{ selector =>
debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
s"${preferredReadReplica.get} for $clientMetadata")
}
// If a preferred read-replica is set, skip the read
val offsetSnapshot: LogOffsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, false)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = -1L,
readSize = 0,
lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
preferredReadReplica = preferredReadReplica,
exception = None)
} else {
readInfo.fetchedData
}
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
val readInfo: LogReadInfo = partition.readRecords(
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchIsolation = fetchIsolation,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
LogReadResult(info = fetchDataInfo,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
readSize = adjustedMaxBytes,
lastStableOffset = Some(readInfo.lastStableOffset),
exception = None)
// Check if the HW known to the follower is behind the actual HW
val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId)
.exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)
val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {
// If the partition is being throttled, simply return an empty set.
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else {
readInfo.fetchedData
}
LogReadResult(info = fetchDataInfo,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
readSize = adjustedMaxBytes,
lastStableOffset = Some(readInfo.lastStableOffset),
preferredReadReplica = preferredReadReplica,
followerNeedsHwUpdate = followerNeedsHwUpdate,
exception = None)
}
} catch {
// NOTE: Failed fetch requests metric is not incremented for known exceptions since it
// is supposed to indicate un-expected failure of a broker in handling a fetch request
@ -1000,6 +1058,59 @@ class ReplicaManager(val config: KafkaConfig,
result
}
/**
* Using the configured [[ReplicaSelector]], determine the preferred read replica for a partition given the
* client metadata, the requested offset, and the current set of replicas. If the preferred read replica is the
* leader, return None
*/
def findPreferredReadReplica(tp: TopicPartition,
clientMetadata: ClientMetadata,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int] = {
val partition = getPartitionOrException(tp, expectLeader = false)
if (partition.isLeader) {
if (Request.isValidBrokerId(replicaId)) {
// Don't look up preferred for follower fetches via normal replication
Option.empty
} else {
replicaSelectorOpt.flatMap { replicaSelector =>
val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(tp, new ListenerName(clientMetadata.listenerName))
var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas
// Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
.filter(replica => replica.logEndOffset >= fetchOffset)
.filter(replica => replica.logStartOffset <= fetchOffset)
.map(replica => new DefaultReplicaView(
replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
replica.logEndOffset,
currentTimeMs - replica.lastCaughtUpTimeMs
))
if (partition.leaderReplicaIdOpt.isDefined) {
val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt
.map(replicaId => replicaEndpoints.getOrElse(replicaId, Node.noNode()))
.map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset, 0L))
.get
replicaInfoSet ++= Set(leaderReplica)
val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
replicaSelector.select(tp, clientMetadata, partitionInfo).asScala
.filter(!_.endpoint.isEmpty)
// Even though the replica selector can return the leader, we don't want to send it out with the
// FetchResponse, so we exclude it here
.filter(!_.equals(leaderReplica))
.map(_.endpoint.id)
} else {
None
}
}
}
} else {
None
}
}
/**
* To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
@ -1424,6 +1535,15 @@ class ReplicaManager(val config: KafkaConfig,
}
}
private def updateFollowerHighWatermark(topicPartition: TopicPartition, followerId: Int, highWatermark: Long): Unit = {
nonOfflinePartition(topicPartition).flatMap(_.getReplica(followerId)) match {
case Some(replica) => replica.updateLastSentHighWatermark(highWatermark)
case None =>
warn(s"While updating the HW for follower $followerId for partition $topicPartition, " +
s"the replica could not be found.")
}
}
private def leaderPartitionsIterator: Iterator[Partition] =
nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined)
@ -1516,6 +1636,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedElectLeaderPurgatory.shutdown()
if (checkpointHW)
checkpointHighWatermarks()
replicaSelectorOpt.foreach(_.close)
info("Shut down completely")
}
@ -1527,6 +1648,14 @@ class ReplicaManager(val config: KafkaConfig,
new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats)
}
protected def createReplicaSelector(): Option[ReplicaSelector] = {
config.replicaSelectorClassName.map { className =>
val tmpReplicaSelector: ReplicaSelector = CoreUtils.createObject[ReplicaSelector](className)
tmpReplicaSelector.configure(config.originals())
tmpReplicaSelector
}
}
def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = {
requestedEpochInfo.map { case (tp, partitionData) =>
val epochEndOffset = getPartition(tp) match {

View File

@ -20,7 +20,8 @@ import java.util.Optional
import scala.collection.Seq
import kafka.cluster.Partition
import kafka.cluster.{Partition, Replica}
import kafka.log.LogOffsetSnapshot
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.FencedLeaderEpochException
import org.apache.kafka.common.protocol.Errors
@ -58,6 +59,7 @@ class DelayedFetchTest extends EasyMockSupport {
fetchMetadata = fetchMetadata,
replicaManager = replicaManager,
quota = replicaQuota,
clientMetadata = None,
responseCallback = callback)
val partition: Partition = mock(classOf[Partition])
@ -79,6 +81,89 @@ class DelayedFetchTest extends EasyMockSupport {
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error)
}
def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
val logStartOffset = 0L
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1
val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)
var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}
val delayedFetch = new DelayedFetch(
delayMs = 500,
fetchMetadata = fetchMetadata,
replicaManager = replicaManager,
quota = replicaQuota,
clientMetadata = None,
responseCallback = callback
)
val partition: Partition = mock(classOf[Partition])
EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
.andReturn(partition)
EasyMock.expect(partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true))
.andReturn(
LogOffsetSnapshot(
logStartOffset = 0,
logEndOffset = new LogOffsetMetadata(500L),
highWatermark = new LogOffsetMetadata(480L),
lastStableOffset = new LogOffsetMetadata(400L)))
expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo)
val follower = new Replica(replicaId, topicPartition)
followerHW.foreach(hw => {
follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L)
follower.updateLastSentHighWatermark(hw)
})
EasyMock.expect(partition.getReplica(replicaId))
.andReturn(Some(follower))
replayAll()
checkResult.apply(delayedFetch)
}
@Test
def testCompleteWhenFollowerLaggingHW(): Unit = {
// No HW from the follower, should complete
resetAll
checkCompleteWhenFollowerLaggingHW(None, delayedFetch => {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
})
// A higher HW from the follower (shouldn't actually be possible)
resetAll
checkCompleteWhenFollowerLaggingHW(Some(500), delayedFetch => {
assertFalse(delayedFetch.tryComplete())
assertFalse(delayedFetch.isCompleted)
})
// An equal HW from follower
resetAll
checkCompleteWhenFollowerLaggingHW(Some(480), delayedFetch => {
assertFalse(delayedFetch.tryComplete())
assertFalse(delayedFetch.isCompleted)
})
// A lower HW from follower, should complete the fetch
resetAll
checkCompleteWhenFollowerLaggingHW(Some(470), delayedFetch => {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
})
}
private def buildFetchMetadata(replicaId: Int,
topicPartition: TopicPartition,
fetchStatus: FetchPartitionStatus): FetchMetadata = {
@ -103,10 +188,38 @@ class DelayedFetchTest extends EasyMockSupport {
fetchMaxBytes = maxBytes,
hardMaxBytesLimit = false,
readPartitionInfo = Seq((topicPartition, fetchPartitionData)),
clientMetadata = None,
quota = replicaQuota))
.andReturn(Seq((topicPartition, buildReadResultWithError(error))))
}
private def expectReadFromReplica(replicaId: Int,
topicPartition: TopicPartition,
fetchPartitionData: FetchRequest.PartitionData): Unit = {
val result = LogReadResult(
exception = None,
info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = -1L,
leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = -1,
lastStableOffset = None)
EasyMock.expect(replicaManager.readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = true,
fetchIsolation = FetchLogEnd,
fetchMaxBytes = maxBytes,
hardMaxBytesLimit = false,
readPartitionInfo = Seq((topicPartition, fetchPartitionData)),
clientMetadata = None,
quota = replicaQuota))
.andReturn(Seq((topicPartition, result))).anyTimes()
}
private def buildReadResultWithError(error: Errors): LogReadResult = {
LogReadResult(
exception = Some(error.exception),

View File

@ -242,9 +242,11 @@ class PartitionTest {
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
def assertSnapshotError(expectedError: Errors, currentLeaderEpoch: Optional[Integer]): Unit = {
partition.fetchOffsetSnapshotOrError(currentLeaderEpoch, fetchOnlyFromLeader = true) match {
case Left(_) => assertEquals(Errors.NONE, expectedError)
case Right(error) => assertEquals(expectedError, error)
try {
partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true)
assertEquals(Errors.NONE, expectedError)
} catch {
case error: ApiException => assertEquals(expectedError, Errors.forException(error))
}
}
@ -262,9 +264,11 @@ class PartitionTest {
def assertSnapshotError(expectedError: Errors,
currentLeaderEpoch: Optional[Integer],
fetchOnlyLeader: Boolean): Unit = {
partition.fetchOffsetSnapshotOrError(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader) match {
case Left(_) => assertEquals(expectedError, Errors.NONE)
case Right(error) => assertEquals(expectedError, error)
try {
partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader)
assertEquals(Errors.NONE, expectedError)
} catch {
case error: ApiException => assertEquals(expectedError, Errors.forException(error))
}
}
@ -1039,6 +1043,7 @@ class PartitionTest {
assertEquals(time.milliseconds(), remoteReplica.lastCaughtUpTimeMs)
assertEquals(6L, remoteReplica.logEndOffset)
assertEquals(0L, remoteReplica.logStartOffset)
}
@Test

View File

@ -3693,6 +3693,37 @@ class LogTest {
assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset))
}
@Test
def testOffsetSnapshot() {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
val log = createLog(logDir, logConfig)
// append a few records
appendAsFollower(log, MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes),
new SimpleRecord("c".getBytes)), 5)
log.highWatermark = 2L
var offsets: LogOffsetSnapshot = log.offsetSnapshot
assertEquals(offsets.highWatermark.messageOffset, 2L)
assertFalse(offsets.highWatermark.messageOffsetOnly)
offsets = log.offsetSnapshot
assertEquals(offsets.highWatermark.messageOffset, 2L)
assertFalse(offsets.highWatermark.messageOffsetOnly)
try {
log.highWatermark = 100L
offsets = log.offsetSnapshot
fail("Should have thrown")
} catch {
case e: OffsetOutOfRangeException => // pass
case _ => fail("Should have seen OffsetOutOfRangeException")
}
}
@Test
def testLastStableOffsetWithMixedProducerData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)

View File

@ -205,7 +205,7 @@ class FetchRequestTest extends BaseRequestTest {
Seq(topicPartition))).build()
val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest)
val partitionData = fetchResponse.responseData.get(topicPartition)
assertEquals(Errors.NOT_LEADER_FOR_PARTITION, partitionData.error)
assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionData.error)
}
@Test
@ -238,8 +238,8 @@ class FetchRequestTest extends BaseRequestTest {
// Check follower error codes
val followerId = TestUtils.findFollowerId(topicPartition, servers)
assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty())
assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
}

View File

@ -50,6 +50,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._
import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.replica.ClientMetadata
import org.junit.Assert.{assertEquals, assertNull, assertTrue}
import org.junit.{After, Test}
@ -464,14 +465,15 @@ class KafkaApisTest {
replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
anyObject[Seq[(TopicPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota],
anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel])
anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel],
anyObject[Option[ClientMetadata]])
expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
def answer: Unit = {
val callback = getCurrentArguments.apply(7).asInstanceOf[(Seq[(TopicPartition, FetchPartitionData)] => Unit)]
val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
callback(Seq(tp -> new FetchPartitionData(Errors.NONE, hw, 0, records,
None, None)))
None, None, Option.empty)))
}
})

View File

@ -655,6 +655,7 @@ class KafkaConfigTest {
case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaSelectorClassProp => // Ignore string
case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")

View File

@ -395,6 +395,7 @@ class ReplicaAlterLogDirsThreadTest {
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
EasyMock.anyObject(),
EasyMock.anyObject()))
.andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {
@ -629,6 +630,7 @@ class ReplicaAlterLogDirsThreadTest {
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
EasyMock.anyObject(),
EasyMock.anyObject()))
.andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {

View File

@ -64,7 +64,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = quota)
quota = quota,
clientMetadata = None)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
@ -89,7 +90,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = quota)
quota = quota,
clientMetadata = None)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
@ -113,7 +115,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = quota)
quota = quota,
clientMetadata = None)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
@ -137,7 +140,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = quota)
quota = quota,
clientMetadata = None)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
@ -167,6 +171,7 @@ class ReplicaManagerQuotasTest {
EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[TopicPartition], EasyMock.anyObject[Int]))
.andReturn(!isReplicaInSync).anyTimes()
EasyMock.expect(partition.getReplica(1)).andReturn(None)
EasyMock.replay(replicaManager, partition)
val tp = new TopicPartition("t1", 0)
@ -179,9 +184,10 @@ class ReplicaManagerQuotasTest {
fetchIsolation = FetchLogEnd,
isFromFollower = true,
replicaId = 1,
fetchPartitionStatus = List((tp, fetchPartitionStatus)))
fetchPartitionStatus = List((tp, fetchPartitionStatus))
)
new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager,
quota = null, responseCallback = null) {
quota = null, clientMetadata = None, responseCallback = null) {
override def forceComplete(): Boolean = true
}
}

View File

@ -18,10 +18,13 @@
package kafka.server
import java.io.File
import java.util.{Optional, Properties}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.net.InetAddress
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Optional, Properties}
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker
@ -29,16 +32,22 @@ import kafka.cluster.BrokerEndPoint
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.checkpoints.LazyOffsetCheckpoints
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.TestUtils.createBroker
import kafka.utils.timer.MockTimer
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, IsolationLevel, LeaderAndIsrRequest}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.zookeeper.data.Stat
@ -179,12 +188,6 @@ class ReplicaManagerTest {
assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error)
}
// Fetch some messages
val fetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0),
new PartitionData(0, 0, 100000, Optional.empty()),
minBytes = 100000)
assertFalse(fetchResult.isFired)
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) ->
@ -193,7 +196,6 @@ class ReplicaManagerTest {
rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
assertTrue(appendResult.isFired)
assertTrue(fetchResult.isFired)
} finally {
rm.shutdown(checkpointHW = false)
}
@ -515,7 +517,8 @@ class ReplicaManagerTest {
fetchInfos = Seq(tp -> validFetchPartitionData),
quota = UnboundedQuota,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
responseCallback = callback
responseCallback = callback,
clientMetadata = None
)
assertTrue(successfulFetch.isDefined)
@ -537,7 +540,8 @@ class ReplicaManagerTest {
fetchInfos = Seq(tp -> invalidFetchPartitionData),
quota = UnboundedQuota,
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
responseCallback = callback
responseCallback = callback,
clientMetadata = None
)
assertTrue(successfulFetch.isDefined)
@ -617,7 +621,8 @@ class ReplicaManagerTest {
tp1 -> new PartitionData(1, 0, 100000, Optional.empty())),
quota = UnboundedQuota,
responseCallback = fetchCallback,
isolationLevel = IsolationLevel.READ_UNCOMMITTED
isolationLevel = IsolationLevel.READ_UNCOMMITTED,
clientMetadata = None
)
val tp0Log = replicaManager.localLog(tp0)
assertTrue(tp0Log.isDefined)
@ -678,6 +683,157 @@ class ReplicaManagerTest {
EasyMock.verify(mockLogMgr)
}
@Test
def testReplicaSelector(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val controllerId = 0
val leaderEpoch = 1
val leaderEpochIncrement = 2
val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition))
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
partition.createLogIfNotExists(leaderBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
partition.makeLeader(
controllerId,
leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
correlationId,
offsetCheckpoints
)
val tp0 = new TopicPartition(topic, 0)
val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
// We expect to select the leader, which means we return None
val preferredReadReplica: Option[Int] = replicaManager.findPreferredReadReplica(
tp0, metadata, Request.OrdinaryConsumerId, 1L, System.currentTimeMillis)
assertFalse(preferredReadReplica.isDefined)
}
@Test
def testPreferredReplicaAsFollower(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0)
replicaManager.createPartition(new TopicPartition(topic, 0))
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) ->
new LeaderAndIsrRequest.PartitionState(0, 1, 1, brokerList, 0, brokerList, false)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
val consumerResult = fetchAsConsumer(replicaManager, tp0,
new PartitionData(0, 0, 100000, Optional.empty()),
clientMetadata = Some(metadata))
// Fetch from follower succeeds
assertTrue(consumerResult.isFired)
// But only leader will compute preferred replica
assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty)
}
@Test
def testPreferredReplicaAsLeader(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0)
replicaManager.createPartition(new TopicPartition(topic, 0))
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) ->
new LeaderAndIsrRequest.PartitionState(0, 0, 1, brokerList, 0, brokerList, false)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
val consumerResult = fetchAsConsumer(replicaManager, tp0,
new PartitionData(0, 0, 100000, Optional.empty()),
clientMetadata = Some(metadata))
// Fetch from follower succeeds
assertTrue(consumerResult.isFired)
// Returns a preferred replica (should just be the leader, which is None)
assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
}
@Test(expected = classOf[ClassNotFoundException])
def testUnknownReplicaSelector(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
val props = new Properties()
props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class")
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props)
}
@Test
def testDefaultReplicaSelector(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
assertFalse(replicaManager.replicaSelectorOpt.isDefined)
}
/**
* This method assumes that the test using created ReplicaManager calls
* ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing
@ -688,9 +844,11 @@ class ReplicaManagerTest {
followerBrokerId: Int,
leaderBrokerId: Int,
countDownLatch: CountDownLatch,
expectTruncation: Boolean) : (ReplicaManager, LogManager) = {
expectTruncation: Boolean,
extraProps: Properties = new Properties()) : (ReplicaManager, LogManager) = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.asScala ++= extraProps.asScala
val config = KafkaConfig.fromProps(props)
// Setup mock local log to have leader epoch of 3 and offset of 10
@ -749,9 +907,16 @@ class ReplicaManagerTest {
.andReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId)))
.anyTimes
}
EasyMock
.expect(metadataCache.getPartitionReplicaEndpoints(
EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(Map(
leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap
)
.anyTimes()
EasyMock.replay(metadataCache)
val timer = new MockTimer
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false)
@ -860,16 +1025,18 @@ class ReplicaManagerTest {
partition: TopicPartition,
partitionData: PartitionData,
minBytes: Int = 0,
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = {
fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel)
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = {
fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata)
}
private def fetchAsFollower(replicaManager: ReplicaManager,
partition: TopicPartition,
partitionData: PartitionData,
minBytes: Int = 0,
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = {
fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel)
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = {
fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel, clientMetadata)
}
private def fetchMessages(replicaManager: ReplicaManager,
@ -877,7 +1044,8 @@ class ReplicaManagerTest {
partition: TopicPartition,
partitionData: PartitionData,
minBytes: Int,
isolationLevel: IsolationLevel): CallbackResult[FetchPartitionData] = {
isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]): CallbackResult[FetchPartitionData] = {
val result = new CallbackResult[FetchPartitionData]()
def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
assertEquals(1, responseStatus.size)
@ -895,7 +1063,9 @@ class ReplicaManagerTest {
fetchInfos = Seq(partition -> partitionData),
quota = UnboundedQuota,
responseCallback = fetchCallback,
isolationLevel = isolationLevel)
isolationLevel = isolationLevel,
clientMetadata = clientMetadata
)
result
}

View File

@ -178,7 +178,8 @@ class SimpleFetchTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = UnboundedQuota).find(_._1 == topicPartition)
quota = UnboundedQuota,
clientMetadata = None).find(_._1 == topicPartition)
val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next()
assertEquals("Reading committed data should return messages only up to high watermark", recordToHW,
new SimpleRecord(firstReadRecord))
@ -190,7 +191,8 @@ class SimpleFetchTest {
fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo,
quota = UnboundedQuota).find(_._1 == topicPartition)
quota = UnboundedQuota,
clientMetadata = None).find(_._1 == topicPartition)
val firstRecord = readAllRecords.get._2.info.records.records.iterator.next()
assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,

View File

@ -86,6 +86,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
jaas_override_variables A dict of variables to be used in the jaas.conf template file
kafka_opts_override Override parameters of the KAFKA_OPTS environment variable
client_prop_file_override Override client.properties file used by the consumer
consumer_properties A dict of values to pass in as --consumer-property key=value
"""
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT)
@ -208,7 +209,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
if self.consumer_properties is not None:
for k, v in self.consumer_properties.items():
cmd += "--consumer_properties %s=%s" % (k, v)
cmd += " --consumer-property %s=%s" % (k, v)
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd

View File

@ -94,7 +94,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
listener_security_config=ListenerSecurityConfig()):
listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides={}):
"""
:param context: test context
:param ZookeeperService zk:
@ -129,6 +129,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.server_prop_overides = []
else:
self.server_prop_overides = server_prop_overides
if per_node_server_prop_overrides is None:
self.per_node_server_prop_overrides = {}
else:
self.per_node_server_prop_overrides = per_node_server_prop_overrides
self.log_level = "DEBUG"
self.zk_chroot = zk_chroot
self.listener_security_config = listener_security_config
@ -295,6 +299,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for prop in self.server_prop_overides:
override_configs[prop[0]] = prop[1]
for prop in self.per_node_server_prop_overrides.get(self.idx(node), []):
override_configs[prop[0]] = prop[1]
#update template configs with test override configs
configs.update(override_configs)

View File

@ -27,9 +27,10 @@ class JmxMixin(object):
- we assume the service using JmxMixin also uses KafkaPathResolverMixin
- this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted
"""
def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, root="/mnt"):
def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, jmx_poll_ms=1000, root="/mnt"):
self.jmx_object_names = jmx_object_names
self.jmx_attributes = jmx_attributes or []
self.jmx_poll_ms = jmx_poll_ms
self.jmx_port = 9192
self.started = [False] * num_nodes
@ -71,7 +72,7 @@ class JmxMixin(object):
if use_jmxtool_version <= V_0_11_0_0:
use_jmxtool_version = DEV_BRANCH
cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), self.jmx_class_name())
cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
cmd += "--reporting-interval %d --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (self.jmx_poll_ms, self.jmx_port)
cmd += " --wait"
for jmx_object_name in self.jmx_object_names:
cmd += " --object-name %s" % jmx_object_name
@ -83,7 +84,7 @@ class JmxMixin(object):
self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd))
node.account.ssh(cmd, allow_fail=False)
wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
wait_until(lambda: self._jmx_has_output(node), timeout_sec=30, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
self.started[idx-1] = True
def _jmx_has_output(self, node):

View File

@ -21,7 +21,6 @@ from kafkatest.services.kafka import TopicPartition
from kafkatest.services.verifiable_consumer import VerifiableConsumer
class TruncationTest(VerifiableConsumerTest):
TOPIC = "test_topic"
NUM_PARTITIONS = 1