KAFKA-8443; Broker support for fetch from followers (#6832)

Follow on to #6731, this PR adds broker-side support for [KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica) (fetch from followers). 

Changes:
* All brokers will handle FetchRequest regardless of leadership
* Leaders can compute a preferred replica to return to the client
* New ReplicaSelector interface for determining the preferred replica
* Incremental fetches will include partitions with no records if the preferred replica has been computed
* Adds new JMX to expose the current preferred read replica of a partition in the consumer

Two new conditions were added for completing a delayed fetch. They both relate to communicating the high watermark to followers without waiting for a timeout:
* For regular fetches, if the high watermark changes within a single fetch request 
* For incremental fetch sessions, if the follower's high watermark is lower than the leader

A new JMX attribute `preferred-read-replica` was added to the `kafka.consumer:type=consumer-fetch-manager-metrics,client-id=some-consumer,topic=my-topic,partition=0` object. This was added to support the new system test which verifies that the fetch from follower behavior works end-to-end. This attribute could also be useful in the future when debugging problems with the consumer.

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jun Rao <junrao@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
David Arthur 2019-07-04 11:18:51 -04:00 committed by Jason Gustafson
parent 41e1c13a52
commit 23beeea34b
34 changed files with 1235 additions and 133 deletions

View File

@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -42,6 +43,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Avg;
@ -1706,8 +1708,20 @@ public class Fetcher<K, V> implements Closeable {
if (!newAssignedPartitions.contains(tp)) { if (!newAssignedPartitions.contains(tp)) {
metrics.removeSensor(partitionLagMetricName(tp)); metrics.removeSensor(partitionLagMetricName(tp));
metrics.removeSensor(partitionLeadMetricName(tp)); metrics.removeSensor(partitionLeadMetricName(tp));
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
} }
} }
for (TopicPartition tp : newAssignedPartitions) {
if (!this.assignedPartitions.contains(tp)) {
MetricName metricName = partitionPreferredReadReplicaMetricName(tp);
if (metrics.metric(metricName) == null) {
metrics.addMetric(metricName, (Gauge<Integer>) (config, now) ->
subscription.preferredReadReplica(tp, 0L).orElse(-1));
}
}
}
this.assignedPartitions = newAssignedPartitions; this.assignedPartitions = newAssignedPartitions;
this.assignmentId = newAssignmentId; this.assignmentId = newAssignmentId;
} }
@ -1719,9 +1733,7 @@ public class Fetcher<K, V> implements Closeable {
String name = partitionLeadMetricName(tp); String name = partitionLeadMetricName(tp);
Sensor recordsLead = this.metrics.getSensor(name); Sensor recordsLead = this.metrics.getSensor(name);
if (recordsLead == null) { if (recordsLead == null) {
Map<String, String> metricTags = new HashMap<>(2); Map<String, String> metricTags = topicPartitionTags(tp);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
recordsLead = this.metrics.sensor(name); recordsLead = this.metrics.sensor(name);
@ -1738,10 +1750,7 @@ public class Fetcher<K, V> implements Closeable {
String name = partitionLagMetricName(tp); String name = partitionLagMetricName(tp);
Sensor recordsLag = this.metrics.getSensor(name); Sensor recordsLag = this.metrics.getSensor(name);
if (recordsLag == null) { if (recordsLag == null) {
Map<String, String> metricTags = new HashMap<>(2); Map<String, String> metricTags = topicPartitionTags(tp);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
recordsLag = this.metrics.sensor(name); recordsLag = this.metrics.sensor(name);
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value()); recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, metricTags), new Value());
@ -1759,6 +1768,17 @@ public class Fetcher<K, V> implements Closeable {
return tp + ".records-lead"; return tp + ".records-lead";
} }
private MetricName partitionPreferredReadReplicaMetricName(TopicPartition tp) {
Map<String, String> metricTags = topicPartitionTags(tp);
return this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, metricTags);
}
private Map<String, String> topicPartitionTags(TopicPartition tp) {
Map<String, String> metricTags = new HashMap<>(2);
metricTags.put("topic", tp.topic().replace('.', '_'));
metricTags.put("partition", String.valueOf(tp.partition()));
return metricTags;
}
} }
@Override @Override

View File

@ -54,6 +54,7 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate partitionRecordsLead; public MetricNameTemplate partitionRecordsLead;
public MetricNameTemplate partitionRecordsLeadMin; public MetricNameTemplate partitionRecordsLeadMin;
public MetricNameTemplate partitionRecordsLeadAvg; public MetricNameTemplate partitionRecordsLeadAvg;
public MetricNameTemplate partitionPreferredReadReplica;
public FetcherMetricsRegistry() { public FetcherMetricsRegistry() {
this(new HashSet<String>(), ""); this(new HashSet<String>(), "");
@ -139,7 +140,9 @@ public class FetcherMetricsRegistry {
"The min lead of the partition", partitionTags); "The min lead of the partition", partitionTags);
this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName, this.partitionRecordsLeadAvg = new MetricNameTemplate("records-lead-avg", groupName,
"The average lead of the partition", partitionTags); "The average lead of the partition", partitionTags);
this.partitionPreferredReadReplica = new MetricNameTemplate(
"preferred-read-replica", "consumer-fetch-manager-metrics",
"The current read replica for the partition, or -1 if reading from leader", partitionTags);
} }
public List<MetricNameTemplate> getAllTemplates() { public List<MetricNameTemplate> getAllTemplates() {
@ -171,7 +174,8 @@ public class FetcherMetricsRegistry {
partitionRecordsLagMax, partitionRecordsLagMax,
partitionRecordsLead, partitionRecordsLead,
partitionRecordsLeadMin, partitionRecordsLeadMin,
partitionRecordsLeadAvg partitionRecordsLeadAvg,
partitionPreferredReadReplica
); );
} }

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import java.net.InetAddress;
import java.util.Objects;
/**
* Holder for all the client metadata required to determine a preferred replica.
*/
public interface ClientMetadata {
/**
* Rack ID sent by the client
*/
String rackId();
/**
* Client ID sent by the client
*/
String clientId();
/**
* Incoming address of the client
*/
InetAddress clientAddress();
/**
* Security principal of the client
*/
KafkaPrincipal principal();
/**
* Listener name for the client
*/
String listenerName();
class DefaultClientMetadata implements ClientMetadata {
private final String rackId;
private final String clientId;
private final InetAddress clientAddress;
private final KafkaPrincipal principal;
private final String listenerName;
public DefaultClientMetadata(String rackId, String clientId, InetAddress clientAddress,
KafkaPrincipal principal, String listenerName) {
this.rackId = rackId;
this.clientId = clientId;
this.clientAddress = clientAddress;
this.principal = principal;
this.listenerName = listenerName;
}
@Override
public String rackId() {
return rackId;
}
@Override
public String clientId() {
return clientId;
}
@Override
public InetAddress clientAddress() {
return clientAddress;
}
@Override
public KafkaPrincipal principal() {
return principal;
}
@Override
public String listenerName() {
return listenerName;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultClientMetadata that = (DefaultClientMetadata) o;
return Objects.equals(rackId, that.rackId) &&
Objects.equals(clientId, that.clientId) &&
Objects.equals(clientAddress, that.clientAddress) &&
Objects.equals(principal, that.principal) &&
Objects.equals(listenerName, that.listenerName);
}
@Override
public int hashCode() {
return Objects.hash(rackId, clientId, clientAddress, principal, listenerName);
}
@Override
public String toString() {
return "DefaultClientMetadata{" +
"rackId='" + rackId + '\'' +
", clientId='" + clientId + '\'' +
", clientAddress=" + clientAddress +
", principal=" + principal +
", listenerName='" + listenerName + '\'' +
'}';
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
/**
* View of a partition used by {@link ReplicaSelector} to determine a preferred replica.
*/
public interface PartitionView {
Set<ReplicaView> replicas();
ReplicaView leader();
class DefaultPartitionView implements PartitionView {
private final Set<ReplicaView> replicas;
private final ReplicaView leader;
public DefaultPartitionView(Set<ReplicaView> replicas, ReplicaView leader) {
this.replicas = Collections.unmodifiableSet(replicas);
this.leader = leader;
}
@Override
public Set<ReplicaView> replicas() {
return replicas;
}
@Override
public ReplicaView leader() {
return leader;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultPartitionView that = (DefaultPartitionView) o;
return Objects.equals(replicas, that.replicas) &&
Objects.equals(leader, that.leader);
}
@Override
public int hashCode() {
return Objects.hash(replicas, leader);
}
@Override
public String toString() {
return "DefaultPartitionView{" +
"replicas=" + replicas +
", leader=" + leader +
'}';
}
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.TopicPartition;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Returns a replica whose rack id is equal to the rack id specified in the client request metadata. If no such replica
* is found, returns the leader.
*/
public class RackAwareReplicaSelector implements ReplicaSelector {
@Override
public Optional<ReplicaView> select(TopicPartition topicPartition,
ClientMetadata clientMetadata,
PartitionView partitionView) {
if (clientMetadata.rackId() != null && !clientMetadata.rackId().isEmpty()) {
Set<ReplicaView> sameRackReplicas = partitionView.replicas().stream()
.filter(replicaInfo -> clientMetadata.rackId().equals(replicaInfo.endpoint().rack()))
.collect(Collectors.toSet());
if (sameRackReplicas.isEmpty()) {
return Optional.of(partitionView.leader());
} else {
if (sameRackReplicas.contains(partitionView.leader())) {
// Use the leader if it's in this rack
return Optional.of(partitionView.leader());
} else {
// Otherwise, get the most caught-up replica
return sameRackReplicas.stream().max(ReplicaView.comparator());
}
}
} else {
return Optional.of(partitionView.leader());
}
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
/**
* Plug-able interface for selecting a preferred read replica given the current set of replicas for a partition
* and metadata from the client.
*/
public interface ReplicaSelector extends Configurable, Closeable {
/**
* Select the preferred replica a client should use for fetching. If no replica is available, this will return an
* empty optional.
*/
Optional<ReplicaView> select(TopicPartition topicPartition,
ClientMetadata clientMetadata,
PartitionView partitionView);
@Override
default void close() throws IOException {
// No-op by default
}
@Override
default void configure(Map<String, ?> configs) {
// No-op by default
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.Node;
import java.util.Comparator;
import java.util.Objects;
/**
* View of a replica used by {@link ReplicaSelector} to determine a preferred replica.
*/
public interface ReplicaView {
/**
* The endpoint information for this replica (hostname, port, rack, etc)
*/
Node endpoint();
/**
* The log end offset for this replica
*/
long logEndOffset();
/**
* The number of milliseconds (if any) since the last time this replica was caught up to the high watermark.
* For a leader replica, this is always zero.
*/
long timeSinceLastCaughtUpMs();
/**
* Comparator for ReplicaView that returns in the order of "most caught up". This is used for deterministic
* selection of a replica when there is a tie from a selector.
*/
static Comparator<ReplicaView> comparator() {
return Comparator.comparingLong(ReplicaView::logEndOffset)
.thenComparing(Comparator.comparingLong(ReplicaView::timeSinceLastCaughtUpMs).reversed())
.thenComparing(replicaInfo -> replicaInfo.endpoint().id());
}
class DefaultReplicaView implements ReplicaView {
private final Node endpoint;
private final long logEndOffset;
private final long timeSinceLastCaughtUpMs;
public DefaultReplicaView(Node endpoint, long logEndOffset, long timeSinceLastCaughtUpMs) {
this.endpoint = endpoint;
this.logEndOffset = logEndOffset;
this.timeSinceLastCaughtUpMs = timeSinceLastCaughtUpMs;
}
@Override
public Node endpoint() {
return endpoint;
}
@Override
public long logEndOffset() {
return logEndOffset;
}
@Override
public long timeSinceLastCaughtUpMs() {
return timeSinceLastCaughtUpMs;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DefaultReplicaView that = (DefaultReplicaView) o;
return logEndOffset == that.logEndOffset &&
Objects.equals(endpoint, that.endpoint) &&
Objects.equals(timeSinceLastCaughtUpMs, that.timeSinceLastCaughtUpMs);
}
@Override
public int hashCode() {
return Objects.hash(endpoint, logEndOffset, timeSinceLastCaughtUpMs);
}
@Override
public String toString() {
return "DefaultReplicaView{" +
"endpoint=" + endpoint +
", logEndOffset=" + logEndOffset +
", timeSinceLastCaughtUpMs=" + timeSinceLastCaughtUpMs +
'}';
}
}
}

View File

@ -195,6 +195,7 @@ public class FetchRequest extends AbstractRequest {
// V10 bumped up to indicate ZStandard capability. (see KIP-110) // V10 bumped up to indicate ZStandard capability. (see KIP-110)
private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9; private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9;
// V11 added rack ID to support read from followers (KIP-392)
private static final Schema FETCH_REQUEST_V11 = new Schema( private static final Schema FETCH_REQUEST_V11 = new Schema(
REPLICA_ID, REPLICA_ID,
MAX_WAIT_TIME, MAX_WAIT_TIME,

View File

@ -144,6 +144,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
LOG_START_OFFSET, LOG_START_OFFSET,
new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4))); new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
// Introduced in V11 to support read from followers (KIP-392)
private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V6 = new Schema( private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V6 = new Schema(
PARTITION_ID, PARTITION_ID,
ERROR_CODE, ERROR_CODE,
@ -207,6 +208,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
// V10 bumped up to indicate ZStandard capability. (see KIP-110) // V10 bumped up to indicate ZStandard capability. (see KIP-110)
private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9; private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9;
// V11 added preferred read replica for each partition response to support read from followers (KIP-392)
private static final Schema FETCH_RESPONSE_V11 = new Schema( private static final Schema FETCH_RESPONSE_V11 = new Schema(
THROTTLE_TIME_MS, THROTTLE_TIME_MS,
ERROR_CODE, ERROR_CODE,
@ -329,7 +331,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
result = 31 * result + Long.hashCode(highWatermark); result = 31 * result + Long.hashCode(highWatermark);
result = 31 * result + Long.hashCode(lastStableOffset); result = 31 * result + Long.hashCode(lastStableOffset);
result = 31 * result + Long.hashCode(logStartOffset); result = 31 * result + Long.hashCode(logStartOffset);
result = 31 * result + (preferredReadReplica != null ? preferredReadReplica.hashCode() : 0); result = 31 * result + Objects.hashCode(preferredReadReplica);
result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0); result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
result = 31 * result + (records != null ? records.hashCode() : 0); result = 31 * result + (records != null ? records.hashCode() : 0);
return result; return result;

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.replica;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.junit.Test;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.test.TestUtils.assertOptional;
import static org.junit.Assert.assertEquals;
public class ReplicaSelectorTest {
@Test
public void testSameRackSelector() {
TopicPartition tp = new TopicPartition("test", 0);
List<ReplicaView> replicaViewSet = replicaInfoSet();
ReplicaView leader = replicaViewSet.get(0);
PartitionView partitionView = partitionInfo(new HashSet<>(replicaViewSet), leader);
ReplicaSelector selector = new RackAwareReplicaSelector();
Optional<ReplicaView> selected = selector.select(tp, metadata("rack-b"), partitionView);
assertOptional(selected, replicaInfo -> {
assertEquals("Expect replica to be in rack-b", replicaInfo.endpoint().rack(), "rack-b");
assertEquals("Expected replica 3 since it is more caught-up", replicaInfo.endpoint().id(), 3);
});
selected = selector.select(tp, metadata("not-a-rack"), partitionView);
assertOptional(selected, replicaInfo -> {
assertEquals("Expect leader when we can't find any nodes in given rack", replicaInfo, leader);
});
selected = selector.select(tp, metadata("rack-a"), partitionView);
assertOptional(selected, replicaInfo -> {
assertEquals("Expect replica to be in rack-a", replicaInfo.endpoint().rack(), "rack-a");
assertEquals("Expect the leader since it's in rack-a", replicaInfo, leader);
});
}
static List<ReplicaView> replicaInfoSet() {
return Stream.of(
replicaInfo(new Node(0, "host0", 1234, "rack-a"), 4, 0),
replicaInfo(new Node(1, "host1", 1234, "rack-a"), 2, 5),
replicaInfo(new Node(2, "host2", 1234, "rack-b"), 3, 3),
replicaInfo(new Node(3, "host3", 1234, "rack-b"), 4, 2)
).collect(Collectors.toList());
}
static ReplicaView replicaInfo(Node node, long logOffset, long timeSinceLastCaughtUpMs) {
return new ReplicaView.DefaultReplicaView(node, logOffset, timeSinceLastCaughtUpMs);
}
static PartitionView partitionInfo(Set<ReplicaView> replicaViewSet, ReplicaView leader) {
return new PartitionView.DefaultPartitionView(replicaViewSet, leader);
}
static ClientMetadata metadata(String rack) {
return new ClientMetadata.DefaultClientMetadata(rack, "test-client",
InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, "TEST");
}
}

View File

@ -519,16 +519,18 @@ class Partition(val topicPartition: TopicPartition,
if (isNewLeader) { if (isNewLeader) {
// construct the high watermark metadata for the new leader replica // construct the high watermark metadata for the new leader replica
leaderLog.maybeFetchHighWatermarkOffsetMetadata() leaderLog.initializeHighWatermarkOffsetMetadata()
// mark local replica as the leader after converting hw // mark local replica as the leader after converting hw
leaderReplicaIdOpt = Some(localBrokerId) leaderReplicaIdOpt = Some(localBrokerId)
// reset log end offset for remote replicas // reset log end offset for remote replicas
remoteReplicas.foreach { _.updateFetchState( remoteReplicas.foreach { replica =>
replica.updateFetchState(
followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata, followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
followerStartOffset = Log.UnknownOffset, followerStartOffset = Log.UnknownOffset,
followerFetchTimeMs = 0L, followerFetchTimeMs = 0L,
leaderEndOffset = Log.UnknownOffset leaderEndOffset = Log.UnknownOffset
) )
replica.updateLastSentHighWatermark(0L)
} }
} }
// we may need to increment high watermark since ISR could be down to 1 // we may need to increment high watermark since ISR could be down to 1

View File

@ -42,6 +42,10 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
// the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition. // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition.
@volatile private[this] var _lastCaughtUpTimeMs = 0L @volatile private[this] var _lastCaughtUpTimeMs = 0L
// highWatermark is the leader's high watermark after the most recent FetchRequest from this follower. This is
// used to determine the maximum HW this follower knows about. See KIP-392
@volatile private[this] var _lastSentHighWatermark = 0L
def logStartOffset: Long = _logStartOffset def logStartOffset: Long = _logStartOffset
def logEndOffsetMetadata: LogOffsetMetadata = _logEndOffsetMetadata def logEndOffsetMetadata: LogOffsetMetadata = _logEndOffsetMetadata
@ -50,6 +54,8 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs def lastCaughtUpTimeMs: Long = _lastCaughtUpTimeMs
def lastSentHighWatermark: Long = _lastSentHighWatermark
/* /*
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received, * If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received. * set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
@ -78,6 +84,19 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
trace(s"Updated state of replica to $this") trace(s"Updated state of replica to $this")
} }
/**
* Update the high watermark of this remote replica. This is used to track what we think is the last known HW to
* a remote follower. Since this is recorded when we send a response, there is no way to guarantee that the follower
* actually receives this HW. So we consider this to be an upper bound on what the follower knows.
*
* When handling fetches, the last sent high watermark for a replica is checked to see if we should return immediately
* in order to propagate the HW more expeditiously. See KIP-392
*/
def updateLastSentHighWatermark(highWatermark: Long): Unit = {
_lastSentHighWatermark = highWatermark
trace(s"Updated HW of replica to $highWatermark")
}
def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = { def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long): Unit = {
lastFetchLeaderLogEndOffset = curLeaderLogEndOffset lastFetchLeaderLogEndOffset = curLeaderLogEndOffset
lastFetchTimeMs = curTimeMs lastFetchTimeMs = curTimeMs
@ -96,6 +115,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
replicaString.append(s", logEndOffsetMetadata=$logEndOffsetMetadata") replicaString.append(s", logEndOffsetMetadata=$logEndOffsetMetadata")
replicaString.append(s", lastFetchLeaderLogEndOffset=$lastFetchLeaderLogEndOffset") replicaString.append(s", lastFetchLeaderLogEndOffset=$lastFetchLeaderLogEndOffset")
replicaString.append(s", lastFetchTimeMs=$lastFetchTimeMs") replicaString.append(s", lastFetchTimeMs=$lastFetchTimeMs")
replicaString.append(s", lastSentHighWatermark=$lastSentHighWatermark")
replicaString.append(")") replicaString.append(")")
replicaString.toString replicaString.toString
} }

View File

@ -328,12 +328,14 @@ class Log(@volatile var dir: File,
* Convert hw to local offset metadata by reading the log at the hw offset. * Convert hw to local offset metadata by reading the log at the hw offset.
* If the hw offset is out of range, return the first offset of the first log segment as the offset metadata. * If the hw offset is out of range, return the first offset of the first log segment as the offset metadata.
*/ */
def maybeFetchHighWatermarkOffsetMetadata(): Unit = { def initializeHighWatermarkOffsetMetadata(): Unit = {
if (highWatermarkMetadata.messageOffsetOnly) { if (highWatermarkMetadata.messageOffsetOnly) {
highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse { lock.synchronized {
convertToOffsetMetadata(logStartOffset).getOrElse { highWatermarkMetadata = convertToOffsetMetadata(highWatermark).getOrElse {
val firstSegmentOffset = logSegments.head.baseOffset convertToOffsetMetadata(logStartOffset).getOrElse {
LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0) val firstSegmentOffset = logSegments.head.baseOffset
LogOffsetMetadata(firstSegmentOffset, firstSegmentOffset, 0)
}
} }
} }
} }
@ -357,12 +359,40 @@ class Log(@volatile var dir: File,
def lastStableOffsetLag: Long = highWatermark - lastStableOffset def lastStableOffsetLag: Long = highWatermark - lastStableOffset
/**
* Fully materialize and return an offset snapshot including segment position info. This method will update
* the LogOffsetMetadata for the high watermark and last stable offset if they are message-only. Throws an
* offset out of range error if the segment info cannot be loaded.
*/
def offsetSnapshot: LogOffsetSnapshot = { def offsetSnapshot: LogOffsetSnapshot = {
var highWatermark = _highWatermarkMetadata
if (highWatermark.messageOffsetOnly) {
lock.synchronized {
val fullOffset = convertToOffsetMetadataOrThrow(_highWatermarkMetadata.messageOffset)
_highWatermarkMetadata = fullOffset
highWatermark = _highWatermarkMetadata
}
}
var lastStable: LogOffsetMetadata = lastStableOffsetMetadata
if (lastStable.messageOffsetOnly) {
lock synchronized {
firstUnstableOffset match {
case None => highWatermark
case Some(offsetMetadata) =>
val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
firstUnstableOffset = Some(fullOffset)
lastStable = fullOffset
}
}
}
LogOffsetSnapshot( LogOffsetSnapshot(
logStartOffset = logStartOffset, logStartOffset,
logEndOffset = logEndOffsetMetadata, logEndOffsetMetadata,
highWatermark = highWatermarkMetadata, highWatermark,
lastStableOffset = lastStableOffsetMetadata) lastStable
)
} }
private val tags = { private val tags = {
@ -1534,17 +1564,25 @@ class Log(@volatile var dir: File,
*/ */
def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = { def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {
try { try {
val fetchDataInfo = read(offset, Some(convertToOffsetMetadataOrThrow(offset))
maxLength = 1,
maxOffset = None,
minOneMessage = false,
includeAbortedTxns = false)
Some(fetchDataInfo.fetchOffsetMetadata)
} catch { } catch {
case _: OffsetOutOfRangeException => None case _: OffsetOutOfRangeException => None
} }
} }
/**
* Given a message offset, find its corresponding offset metadata in the log.
* If the message offset is out of range, throw an OffsetOutOfRangeException
*/
def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
val fetchDataInfo = read(offset,
maxLength = 1,
maxOffset = None,
minOneMessage = false,
includeAbortedTxns = false)
fetchDataInfo.fetchOffsetMetadata
}
/** /**
* Delete any log segments matching the given predicate function, * Delete any log segments matching the given predicate function,
* starting with the oldest segment and moving forward until a segment doesn't match. * starting with the oldest segment and moving forward until a segment doesn't match.

View File

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchRequest.PartitionData
import scala.collection._ import scala.collection._
@ -59,6 +60,7 @@ class DelayedFetch(delayMs: Long,
fetchMetadata: FetchMetadata, fetchMetadata: FetchMetadata,
replicaManager: ReplicaManager, replicaManager: ReplicaManager,
quota: ReplicaQuota, quota: ReplicaQuota,
clientMetadata: Option[ClientMetadata],
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(delayMs) { extends DelayedOperation(delayMs) {
@ -71,7 +73,7 @@ class DelayedFetch(delayMs: Long,
* Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes * Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Case E: The partition is in an offline log directory on this broker * Case E: The partition is in an offline log directory on this broker
* Case F: This broker is the leader, but the requested epoch is now fenced * Case F: This broker is the leader, but the requested epoch is now fenced
* * Case G: The high watermark on this broker has changed within a FetchSession, need to propagate to follower (KIP-392)
* Upon completion, should return whatever data is available for each valid partition * Upon completion, should return whatever data is available for each valid partition
*/ */
override def tryComplete(): Boolean = { override def tryComplete(): Boolean = {
@ -114,6 +116,14 @@ class DelayedFetch(delayMs: Long,
accumulatedSize += bytesAvailable accumulatedSize += bytesAvailable
} }
} }
if (fetchMetadata.isFromFollower) {
// Case G check if the follower has the latest HW from the leader
if (partition.getReplica(fetchMetadata.replicaId)
.exists(r => offsetSnapshot.highWatermark.messageOffset > r.lastSentHighWatermark)) {
return forceComplete()
}
}
} }
} catch { } catch {
case _: KafkaStorageException => // Case E case _: KafkaStorageException => // Case E
@ -157,11 +167,12 @@ class DelayedFetch(delayMs: Long,
fetchMaxBytes = fetchMetadata.fetchMaxBytes, fetchMaxBytes = fetchMetadata.fetchMaxBytes,
hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit, hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo }, readPartitionInfo = fetchMetadata.fetchPartitionStatus.map { case (tp, status) => tp -> status.fetchInfo },
clientMetadata = clientMetadata,
quota = quota) quota = quota)
val fetchPartitionData = logReadResults.map { case (tp, result) => val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions) result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
} }
responseCallback(fetchPartitionData) responseCallback(fetchPartitionData)

View File

@ -144,6 +144,10 @@ class CachedPartition(val topic: String,
if (updateResponseData) if (updateResponseData)
localLogStartOffset = respData.logStartOffset localLogStartOffset = respData.logStartOffset
} }
if (respData.preferredReadReplica.isPresent) {
// If the broker computed a preferred read replica, we need to include it in the response
mustRespond = true
}
if (respData.error.code != 0) { if (respData.error.code != 0) {
// Partitions with errors are always included in the response. // Partitions with errors are always included in the response.
// We also set the cached highWatermark to an invalid offset, -1. // We also set the cached highWatermark to an invalid offset, -1.

View File

@ -69,6 +69,8 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.network.{ListenerName, Send}
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
@ -82,11 +84,13 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.{Node, TopicPartition}
import scala.compat.java8.OptionConverters._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try} import scala.util.{Failure, Success, Try}
/** /**
* Logic to handle the various Kafka requests * Logic to handle the various Kafka requests
*/ */
@ -577,6 +581,18 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchRequest.toForget, fetchRequest.toForget,
fetchRequest.isFromFollower) fetchRequest.isFromFollower)
val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
// Fetch API version 11 added preferred replica logic
Some(new DefaultClientMetadata(
fetchRequest.rackId,
clientId,
request.context.clientAddress,
request.context.principal,
request.context.listenerName.value))
} else {
None
}
def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = { def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
@ -650,7 +666,8 @@ class KafkaApis(val requestChannel: RequestChannel,
// down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the // down-conversion always guarantees that at least one batch of messages is down-converted and sent out to the
// client. // client.
new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions, partitionData.lastStableOffset, partitionData.logStartOffset,
partitionData.preferredReadReplica, partitionData.abortedTransactions,
new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time)) new LazyDownConversionRecords(tp, unconvertedRecords, magic, fetchContext.getFetchOffset(tp).get, time))
} catch { } catch {
case e: UnsupportedCompressionTypeException => case e: UnsupportedCompressionTypeException =>
@ -659,7 +676,8 @@ class KafkaApis(val requestChannel: RequestChannel,
} }
} }
case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark, case None => new FetchResponse.PartitionData[BaseRecords](partitionData.error, partitionData.highWatermark,
partitionData.lastStableOffset, partitionData.logStartOffset, partitionData.abortedTransactions, partitionData.lastStableOffset, partitionData.logStartOffset,
partitionData.preferredReadReplica, partitionData.abortedTransactions,
unconvertedRecords) unconvertedRecords)
} }
} }
@ -672,7 +690,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET) val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset, partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
data.logStartOffset, abortedTransactions, data.records)) data.logStartOffset, data.preferredReadReplica.map(int2Integer).asJava,
abortedTransactions, data.records))
} }
erroneous.foreach { case (tp, data) => partitions.put(tp, data) } erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
@ -769,7 +788,8 @@ class KafkaApis(val requestChannel: RequestChannel,
interesting, interesting,
replicationQuota(fetchRequest), replicationQuota(fetchRequest),
processResponseCallback, processResponseCallback,
fetchRequest.isolationLevel) fetchRequest.isolationLevel,
clientMetadata)
} }
} }

View File

@ -370,6 +370,7 @@ object KafkaConfig {
val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol" val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
val InterBrokerProtocolVersionProp = "inter.broker.protocol.version" val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
val InterBrokerListenerNameProp = "inter.broker.listener.name" val InterBrokerListenerNameProp = "inter.broker.listener.name"
val ReplicaSelectorClassProp = "replica.selector.class"
/** ********* Controlled shutdown configuration ***********/ /** ********* Controlled shutdown configuration ***********/
val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
@ -699,6 +700,7 @@ object KafkaConfig {
" Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list." " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list."
val InterBrokerListenerNameDoc = s"Name of listener used for communication between brokers. If this is unset, the listener name is defined by $InterBrokerSecurityProtocolProp. " + val InterBrokerListenerNameDoc = s"Name of listener used for communication between brokers. If this is unset, the listener name is defined by $InterBrokerSecurityProtocolProp. " +
s"It is an error to set this and $InterBrokerSecurityProtocolProp properties at the same time." s"It is an error to set this and $InterBrokerSecurityProtocolProp properties at the same time."
val ReplicaSelectorClassDoc = "The fully qualified class name that implements ReplicaSelector. This is used by the broker to find the preferred read replica. By default, we use an implementation that returns the leader."
/** ********* Controlled shutdown configuration ***********/ /** ********* Controlled shutdown configuration ***********/
val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens"
val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying."
@ -966,6 +968,7 @@ object KafkaConfig {
.define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
.define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc) .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, ApiVersionValidator, MEDIUM, InterBrokerProtocolVersionDoc)
.define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc) .define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc)
.define(ReplicaSelectorClassProp, STRING, null, MEDIUM, ReplicaSelectorClassDoc)
/** ********* Controlled shutdown configuration ***********/ /** ********* Controlled shutdown configuration ***********/
.define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
@ -1186,6 +1189,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
/***************** rack configuration **************/ /***************** rack configuration **************/
val rack = Option(getString(KafkaConfig.RackProp)) val rack = Option(getString(KafkaConfig.RackProp))
val replicaSelectorClassName = Option(getString(KafkaConfig.ReplicaSelectorClassProp))
/** ********* Log Configuration ***********/ /** ********* Log Configuration ***********/
val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp) val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)

View File

@ -33,6 +33,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
/** /**
* A cache for the state (e.g., current leader) of each partition. This cache is updated through * A cache for the state (e.g., current leader) of each partition. This cache is updated through
* UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
@ -195,6 +196,24 @@ class MetadataCache(brokerId: Int) extends Logging {
} }
} }
def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
val snapshot = metadataSnapshot
snapshot.partitionStates.get(tp.topic()).flatMap(_.get(tp.partition())).map { partitionInfo =>
val replicaIds = partitionInfo.basePartitionState.replicas
replicaIds.asScala
.map(replicaId => replicaId.intValue() -> {
snapshot.aliveBrokers.get(replicaId.longValue()) match {
case Some(broker) =>
broker.getNode(listenerName).getOrElse(Node.noNode())
case None =>
Node.noNode()
}}).toMap
.filter(pair => pair match {
case (_, node) => !node.isEmpty
})
}.getOrElse(Map.empty[Int, Node])
}
def getControllerId: Option[Int] = metadataSnapshot.controllerId def getControllerId: Option[Int] = metadataSnapshot.controllerId
def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = { def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {

View File

@ -22,7 +22,7 @@ import java.util.Optional
import kafka.api.Request import kafka.api.Request
import kafka.cluster.BrokerEndPoint import kafka.cluster.BrokerEndPoint
import kafka.log.{LogAppendInfo, LogOffsetSnapshot} import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ResultWithPartitions import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.Records import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchResponse.PartitionData import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse} import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable} import scala.collection.{Map, Seq, Set, mutable}
@ -89,7 +89,8 @@ class ReplicaAlterLogDirsThread(name: String,
request.fetchData.asScala.toSeq, request.fetchData.asScala.toSeq,
UnboundedQuota, UnboundedQuota,
processResponseCallback, processResponseCallback,
request.isolationLevel) request.isolationLevel,
None)
if (partitionData == null) if (partitionData == null)
throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}") throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")
@ -121,18 +122,13 @@ class ReplicaAlterLogDirsThread(name: String,
} }
override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch) val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
offsetSnapshot.logStartOffset partition.localLogOrException.logStartOffset
} }
override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = { override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
val offsetSnapshot = offsetSnapshotFromCurrentReplica(topicPartition, leaderEpoch)
offsetSnapshot.logEndOffset.messageOffset
}
private def offsetSnapshotFromCurrentReplica(topicPartition: TopicPartition, leaderEpoch: Int): LogOffsetSnapshot = {
val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false) val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
partition.fetchOffsetSnapshot(Optional.of[Integer](leaderEpoch), fetchOnlyFromLeader = false) partition.localLogOrException.logEndOffset
} }
/** /**

View File

@ -34,12 +34,15 @@ import kafka.utils._
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.apache.kafka.common.ElectionType import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Node
import org.apache.kafka.common.errors._ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.PartitionView.DefaultPartitionView
import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo} import org.apache.kafka.common.requests.DescribeLogDirsResponse.{LogDirInfo, ReplicaInfo}
import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchRequest.PartitionData
@ -47,7 +50,10 @@ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest} import org.apache.kafka.common.requests.{ApiError, DeleteRecordsResponse, DescribeLogDirsResponse, EpochEndOffset, IsolationLevel, LeaderAndIsrRequest, LeaderAndIsrResponse, OffsetsForLeaderEpochRequest, StopReplicaRequest, UpdateMetadataRequest}
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
import org.apache.kafka.common.replica.{ClientMetadata, _}
import scala.compat.java8.OptionConverters._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable} import scala.collection.{Map, Seq, Set, mutable}
@ -75,7 +81,8 @@ case class LogDeleteRecordsResult(requestedOffset: Long, lowWatermark: Long, exc
* @param readSize amount of data that was read from the log i.e. size of the fetch * @param readSize amount of data that was read from the log i.e. size of the fetch
* @param isReadFromLogEnd true if the request read up to the log end offset snapshot * @param isReadFromLogEnd true if the request read up to the log end offset snapshot
* when the read was initiated, false otherwise * when the read was initiated, false otherwise
* @param error Exception if error encountered while reading from the log * @param preferredReadReplica the preferred read replica to be used for future fetches
* @param exception Exception if error encountered while reading from the log
*/ */
case class LogReadResult(info: FetchDataInfo, case class LogReadResult(info: FetchDataInfo,
highWatermark: Long, highWatermark: Long,
@ -85,6 +92,8 @@ case class LogReadResult(info: FetchDataInfo,
fetchTimeMs: Long, fetchTimeMs: Long,
readSize: Int, readSize: Int,
lastStableOffset: Option[Long], lastStableOffset: Option[Long],
preferredReadReplica: Option[Int] = None,
followerNeedsHwUpdate: Boolean = false,
exception: Option[Throwable] = None) { exception: Option[Throwable] = None) {
def error: Errors = exception match { def error: Errors = exception match {
@ -106,7 +115,8 @@ case class FetchPartitionData(error: Errors = Errors.NONE,
logStartOffset: Long, logStartOffset: Long,
records: Records, records: Records,
lastStableOffset: Option[Long], lastStableOffset: Option[Long],
abortedTransactions: Option[List[AbortedTransaction]]) abortedTransactions: Option[List[AbortedTransaction]],
preferredReadReplica: Option[Int])
/** /**
@ -216,6 +226,8 @@ class ReplicaManager(val config: KafkaConfig,
} }
} }
val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector()
val leaderCount = newGauge( val leaderCount = newGauge(
"LeaderCount", "LeaderCount",
new Gauge[Int] { new Gauge[Int] {
@ -807,8 +819,9 @@ class ReplicaManager(val config: KafkaConfig,
} }
/** /**
* Fetch messages from the leader replica, and wait until enough data can be fetched and return; * Fetch messages from a replica, and wait until enough data can be fetched and return;
* the callback function will be triggered either when timeout or required fetch info is satisfied * the callback function will be triggered either when timeout or required fetch info is satisfied.
* Consumers may fetch from any replica, but followers can only fetch from the leader.
*/ */
def fetchMessages(timeout: Long, def fetchMessages(timeout: Long,
replicaId: Int, replicaId: Int,
@ -818,9 +831,9 @@ class ReplicaManager(val config: KafkaConfig,
fetchInfos: Seq[(TopicPartition, PartitionData)], fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota, quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel) { isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]) {
val isFromFollower = Request.isValidBrokerId(replicaId) val isFromFollower = Request.isValidBrokerId(replicaId)
val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId
val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId) val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)
FetchLogEnd FetchLogEnd
@ -829,16 +842,16 @@ class ReplicaManager(val config: KafkaConfig,
else else
FetchHighWatermark FetchHighWatermark
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
val result = readFromLocalLog( val result = readFromLocalLog(
replicaId = replicaId, replicaId = replicaId,
fetchOnlyFromLeader = fetchOnlyFromLeader, fetchOnlyFromLeader = isFromFollower,
fetchIsolation = fetchIsolation, fetchIsolation = fetchIsolation,
fetchMaxBytes = fetchMaxBytes, fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit, hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos, readPartitionInfo = fetchInfos,
quota = quota) quota = quota,
clientMetadata = clientMetadata)
if (isFromFollower) updateFollowerFetchState(replicaId, result) if (isFromFollower) updateFollowerFetchState(replicaId, result)
else result else result
} }
@ -849,23 +862,37 @@ class ReplicaManager(val config: KafkaConfig,
var bytesReadable: Long = 0 var bytesReadable: Long = 0
var errorReadingData = false var errorReadingData = false
val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult] val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
var anyPartitionsNeedHwUpdate = false
logReadResults.foreach { case (topicPartition, logReadResult) => logReadResults.foreach { case (topicPartition, logReadResult) =>
if (logReadResult.error != Errors.NONE) if (logReadResult.error != Errors.NONE)
errorReadingData = true errorReadingData = true
bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
logReadResultMap.put(topicPartition, logReadResult) logReadResultMap.put(topicPartition, logReadResult)
if (isFromFollower && logReadResult.followerNeedsHwUpdate) {
anyPartitionsNeedHwUpdate = true
}
} }
// Wrap the given callback function with another function that will update the HW for the remote follower
val updateHwAndThenCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit =
(fetchPartitionData: Seq[(TopicPartition, FetchPartitionData)]) => {
fetchPartitionData.foreach {
case (tp, partitionData) => updateFollowerHighWatermark(tp, replicaId, partitionData.highWatermark)
}
responseCallback(fetchPartitionData)
}
// respond immediately if 1) fetch request does not want to wait // respond immediately if 1) fetch request does not want to wait
// 2) fetch request does not require any data // 2) fetch request does not require any data
// 3) has enough data to respond // 3) has enough data to respond
// 4) some error happens while reading data // 4) some error happens while reading data
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { // 5) all the requested partitions need HW update
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || anyPartitionsNeedHwUpdate) {
val fetchPartitionData = logReadResults.map { case (tp, result) => val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
result.lastStableOffset, result.info.abortedTransactions) result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica)
} }
responseCallback(fetchPartitionData) updateHwAndThenCallback(fetchPartitionData)
} else { } else {
// construct the fetch results from the read results // construct the fetch results from the read results
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)] val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
@ -875,9 +902,10 @@ class ReplicaManager(val config: KafkaConfig,
fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
}) })
} }
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, isFromFollower,
fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
updateHwAndThenCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) } val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
@ -898,7 +926,8 @@ class ReplicaManager(val config: KafkaConfig,
fetchMaxBytes: Int, fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean, hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)], readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = { quota: ReplicaQuota,
clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = { def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.fetchOffset val offset = fetchInfo.fetchOffset
@ -917,35 +946,64 @@ class ReplicaManager(val config: KafkaConfig,
val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader) val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
val fetchTimeMs = time.milliseconds val fetchTimeMs = time.milliseconds
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition // If we are the leader, determine the preferred read-replica
val readInfo = partition.readRecords( val preferredReadReplica = clientMetadata.flatMap(
fetchOffset = fetchInfo.fetchOffset, metadata => findPreferredReadReplica(tp, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchIsolation = fetchIsolation,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) { if (preferredReadReplica.isDefined) {
// If the partition is being throttled, simply return an empty set. replicaSelectorOpt.foreach{ selector =>
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
} else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) { s"${preferredReadReplica.get} for $clientMetadata")
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make }
// progress in such cases and don't need to report a `RecordTooLargeException` // If a preferred read-replica is set, skip the read
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY) val offsetSnapshot: LogOffsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, false)
LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = offsetSnapshot.highWatermark.messageOffset,
leaderLogStartOffset = offsetSnapshot.logStartOffset,
leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = -1L,
readSize = 0,
lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
preferredReadReplica = preferredReadReplica,
exception = None)
} else { } else {
readInfo.fetchedData // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
} val readInfo: LogReadInfo = partition.readRecords(
fetchOffset = fetchInfo.fetchOffset,
currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
maxBytes = adjustedMaxBytes,
fetchIsolation = fetchIsolation,
fetchOnlyFromLeader = fetchOnlyFromLeader,
minOneMessage = minOneMessage)
LogReadResult(info = fetchDataInfo, // Check if the HW known to the follower is behind the actual HW
highWatermark = readInfo.highWatermark, val followerNeedsHwUpdate: Boolean = partition.getReplica(replicaId)
leaderLogStartOffset = readInfo.logStartOffset, .exists(replica => replica.lastSentHighWatermark < readInfo.highWatermark)
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset, val fetchDataInfo = if (shouldLeaderThrottle(quota, tp, replicaId)) {
fetchTimeMs = fetchTimeMs, // If the partition is being throttled, simply return an empty set.
readSize = adjustedMaxBytes, FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
lastStableOffset = Some(readInfo.lastStableOffset), } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
exception = None) // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
} else {
readInfo.fetchedData
}
LogReadResult(info = fetchDataInfo,
highWatermark = readInfo.highWatermark,
leaderLogStartOffset = readInfo.logStartOffset,
leaderLogEndOffset = readInfo.logEndOffset,
followerLogStartOffset = followerLogStartOffset,
fetchTimeMs = fetchTimeMs,
readSize = adjustedMaxBytes,
lastStableOffset = Some(readInfo.lastStableOffset),
preferredReadReplica = preferredReadReplica,
followerNeedsHwUpdate = followerNeedsHwUpdate,
exception = None)
}
} catch { } catch {
// NOTE: Failed fetch requests metric is not incremented for known exceptions since it // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
// is supposed to indicate un-expected failure of a broker in handling a fetch request // is supposed to indicate un-expected failure of a broker in handling a fetch request
@ -1000,6 +1058,59 @@ class ReplicaManager(val config: KafkaConfig,
result result
} }
/**
* Using the configured [[ReplicaSelector]], determine the preferred read replica for a partition given the
* client metadata, the requested offset, and the current set of replicas. If the preferred read replica is the
* leader, return None
*/
def findPreferredReadReplica(tp: TopicPartition,
clientMetadata: ClientMetadata,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int] = {
val partition = getPartitionOrException(tp, expectLeader = false)
if (partition.isLeader) {
if (Request.isValidBrokerId(replicaId)) {
// Don't look up preferred for follower fetches via normal replication
Option.empty
} else {
replicaSelectorOpt.flatMap { replicaSelector =>
val replicaEndpoints = metadataCache.getPartitionReplicaEndpoints(tp, new ListenerName(clientMetadata.listenerName))
var replicaInfoSet: Set[ReplicaView] = partition.remoteReplicas
// Exclude replicas that don't have the requested offset (whether or not if they're in the ISR)
.filter(replica => replica.logEndOffset >= fetchOffset)
.filter(replica => replica.logStartOffset <= fetchOffset)
.map(replica => new DefaultReplicaView(
replicaEndpoints.getOrElse(replica.brokerId, Node.noNode()),
replica.logEndOffset,
currentTimeMs - replica.lastCaughtUpTimeMs
))
if (partition.leaderReplicaIdOpt.isDefined) {
val leaderReplica: ReplicaView = partition.leaderReplicaIdOpt
.map(replicaId => replicaEndpoints.getOrElse(replicaId, Node.noNode()))
.map(leaderNode => new DefaultReplicaView(leaderNode, partition.localLogOrException.logEndOffset, 0L))
.get
replicaInfoSet ++= Set(leaderReplica)
val partitionInfo = new DefaultPartitionView(replicaInfoSet.asJava, leaderReplica)
replicaSelector.select(tp, clientMetadata, partitionInfo).asScala
.filter(!_.endpoint.isEmpty)
// Even though the replica selector can return the leader, we don't want to send it out with the
// FetchResponse, so we exclude it here
.filter(!_.equals(leaderReplica))
.map(_.endpoint.id)
} else {
None
}
}
}
} else {
None
}
}
/** /**
* To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list, * To avoid ISR thrashing, we only throttle a replica on the leader if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync. * the quota is exceeded and the replica is not in sync.
@ -1424,6 +1535,15 @@ class ReplicaManager(val config: KafkaConfig,
} }
} }
private def updateFollowerHighWatermark(topicPartition: TopicPartition, followerId: Int, highWatermark: Long): Unit = {
nonOfflinePartition(topicPartition).flatMap(_.getReplica(followerId)) match {
case Some(replica) => replica.updateLastSentHighWatermark(highWatermark)
case None =>
warn(s"While updating the HW for follower $followerId for partition $topicPartition, " +
s"the replica could not be found.")
}
}
private def leaderPartitionsIterator: Iterator[Partition] = private def leaderPartitionsIterator: Iterator[Partition] =
nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined) nonOfflinePartitionsIterator.filter(_.leaderLogIfLocal.isDefined)
@ -1516,6 +1636,7 @@ class ReplicaManager(val config: KafkaConfig,
delayedElectLeaderPurgatory.shutdown() delayedElectLeaderPurgatory.shutdown()
if (checkpointHW) if (checkpointHW)
checkpointHighWatermarks() checkpointHighWatermarks()
replicaSelectorOpt.foreach(_.close)
info("Shut down completely") info("Shut down completely")
} }
@ -1527,6 +1648,14 @@ class ReplicaManager(val config: KafkaConfig,
new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats) new ReplicaAlterLogDirsManager(config, this, quotaManager, brokerTopicStats)
} }
protected def createReplicaSelector(): Option[ReplicaSelector] = {
config.replicaSelectorClassName.map { className =>
val tmpReplicaSelector: ReplicaSelector = CoreUtils.createObject[ReplicaSelector](className)
tmpReplicaSelector.configure(config.originals())
tmpReplicaSelector
}
}
def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = { def lastOffsetForLeaderEpoch(requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]): Map[TopicPartition, EpochEndOffset] = {
requestedEpochInfo.map { case (tp, partitionData) => requestedEpochInfo.map { case (tp, partitionData) =>
val epochEndOffset = getPartition(tp) match { val epochEndOffset = getPartition(tp) match {

View File

@ -20,7 +20,8 @@ import java.util.Optional
import scala.collection.Seq import scala.collection.Seq
import kafka.cluster.Partition import kafka.cluster.{Partition, Replica}
import kafka.log.LogOffsetSnapshot
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.FencedLeaderEpochException import org.apache.kafka.common.errors.FencedLeaderEpochException
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
@ -58,6 +59,7 @@ class DelayedFetchTest extends EasyMockSupport {
fetchMetadata = fetchMetadata, fetchMetadata = fetchMetadata,
replicaManager = replicaManager, replicaManager = replicaManager,
quota = replicaQuota, quota = replicaQuota,
clientMetadata = None,
responseCallback = callback) responseCallback = callback)
val partition: Partition = mock(classOf[Partition]) val partition: Partition = mock(classOf[Partition])
@ -79,6 +81,89 @@ class DelayedFetchTest extends EasyMockSupport {
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error) assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.error)
} }
def checkCompleteWhenFollowerLaggingHW(followerHW: Option[Long], checkResult: DelayedFetch => Unit): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
val logStartOffset = 0L
val currentLeaderEpoch = Optional.of[Integer](10)
val replicaId = 1
val fetchStatus = FetchPartitionStatus(
startOffsetMetadata = LogOffsetMetadata(fetchOffset),
fetchInfo = new FetchRequest.PartitionData(fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
val fetchMetadata = buildFetchMetadata(replicaId, topicPartition, fetchStatus)
var fetchResultOpt: Option[FetchPartitionData] = None
def callback(responses: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
fetchResultOpt = Some(responses.head._2)
}
val delayedFetch = new DelayedFetch(
delayMs = 500,
fetchMetadata = fetchMetadata,
replicaManager = replicaManager,
quota = replicaQuota,
clientMetadata = None,
responseCallback = callback
)
val partition: Partition = mock(classOf[Partition])
EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
.andReturn(partition)
EasyMock.expect(partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true))
.andReturn(
LogOffsetSnapshot(
logStartOffset = 0,
logEndOffset = new LogOffsetMetadata(500L),
highWatermark = new LogOffsetMetadata(480L),
lastStableOffset = new LogOffsetMetadata(400L)))
expectReadFromReplica(replicaId, topicPartition, fetchStatus.fetchInfo)
val follower = new Replica(replicaId, topicPartition)
followerHW.foreach(hw => {
follower.updateFetchState(LogOffsetMetadata.UnknownOffsetMetadata, 0L, 0L, 0L)
follower.updateLastSentHighWatermark(hw)
})
EasyMock.expect(partition.getReplica(replicaId))
.andReturn(Some(follower))
replayAll()
checkResult.apply(delayedFetch)
}
@Test
def testCompleteWhenFollowerLaggingHW(): Unit = {
// No HW from the follower, should complete
resetAll
checkCompleteWhenFollowerLaggingHW(None, delayedFetch => {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
})
// A higher HW from the follower (shouldn't actually be possible)
resetAll
checkCompleteWhenFollowerLaggingHW(Some(500), delayedFetch => {
assertFalse(delayedFetch.tryComplete())
assertFalse(delayedFetch.isCompleted)
})
// An equal HW from follower
resetAll
checkCompleteWhenFollowerLaggingHW(Some(480), delayedFetch => {
assertFalse(delayedFetch.tryComplete())
assertFalse(delayedFetch.isCompleted)
})
// A lower HW from follower, should complete the fetch
resetAll
checkCompleteWhenFollowerLaggingHW(Some(470), delayedFetch => {
assertTrue(delayedFetch.tryComplete())
assertTrue(delayedFetch.isCompleted)
})
}
private def buildFetchMetadata(replicaId: Int, private def buildFetchMetadata(replicaId: Int,
topicPartition: TopicPartition, topicPartition: TopicPartition,
fetchStatus: FetchPartitionStatus): FetchMetadata = { fetchStatus: FetchPartitionStatus): FetchMetadata = {
@ -103,10 +188,38 @@ class DelayedFetchTest extends EasyMockSupport {
fetchMaxBytes = maxBytes, fetchMaxBytes = maxBytes,
hardMaxBytesLimit = false, hardMaxBytesLimit = false,
readPartitionInfo = Seq((topicPartition, fetchPartitionData)), readPartitionInfo = Seq((topicPartition, fetchPartitionData)),
clientMetadata = None,
quota = replicaQuota)) quota = replicaQuota))
.andReturn(Seq((topicPartition, buildReadResultWithError(error)))) .andReturn(Seq((topicPartition, buildReadResultWithError(error))))
} }
private def expectReadFromReplica(replicaId: Int,
topicPartition: TopicPartition,
fetchPartitionData: FetchRequest.PartitionData): Unit = {
val result = LogReadResult(
exception = None,
info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
highWatermark = -1L,
leaderLogStartOffset = -1L,
leaderLogEndOffset = -1L,
followerLogStartOffset = -1L,
fetchTimeMs = -1L,
readSize = -1,
lastStableOffset = None)
EasyMock.expect(replicaManager.readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = true,
fetchIsolation = FetchLogEnd,
fetchMaxBytes = maxBytes,
hardMaxBytesLimit = false,
readPartitionInfo = Seq((topicPartition, fetchPartitionData)),
clientMetadata = None,
quota = replicaQuota))
.andReturn(Seq((topicPartition, result))).anyTimes()
}
private def buildReadResultWithError(error: Errors): LogReadResult = { private def buildReadResultWithError(error: Errors): LogReadResult = {
LogReadResult( LogReadResult(
exception = Some(error.exception), exception = Some(error.exception),

View File

@ -242,9 +242,11 @@ class PartitionTest {
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)
def assertSnapshotError(expectedError: Errors, currentLeaderEpoch: Optional[Integer]): Unit = { def assertSnapshotError(expectedError: Errors, currentLeaderEpoch: Optional[Integer]): Unit = {
partition.fetchOffsetSnapshotOrError(currentLeaderEpoch, fetchOnlyFromLeader = true) match { try {
case Left(_) => assertEquals(Errors.NONE, expectedError) partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true)
case Right(error) => assertEquals(expectedError, error) assertEquals(Errors.NONE, expectedError)
} catch {
case error: ApiException => assertEquals(expectedError, Errors.forException(error))
} }
} }
@ -262,9 +264,11 @@ class PartitionTest {
def assertSnapshotError(expectedError: Errors, def assertSnapshotError(expectedError: Errors,
currentLeaderEpoch: Optional[Integer], currentLeaderEpoch: Optional[Integer],
fetchOnlyLeader: Boolean): Unit = { fetchOnlyLeader: Boolean): Unit = {
partition.fetchOffsetSnapshotOrError(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader) match { try {
case Left(_) => assertEquals(expectedError, Errors.NONE) partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = fetchOnlyLeader)
case Right(error) => assertEquals(expectedError, error) assertEquals(Errors.NONE, expectedError)
} catch {
case error: ApiException => assertEquals(expectedError, Errors.forException(error))
} }
} }
@ -1039,6 +1043,7 @@ class PartitionTest {
assertEquals(time.milliseconds(), remoteReplica.lastCaughtUpTimeMs) assertEquals(time.milliseconds(), remoteReplica.lastCaughtUpTimeMs)
assertEquals(6L, remoteReplica.logEndOffset) assertEquals(6L, remoteReplica.logEndOffset)
assertEquals(0L, remoteReplica.logStartOffset) assertEquals(0L, remoteReplica.logStartOffset)
} }
@Test @Test

View File

@ -3693,6 +3693,37 @@ class LogTest {
assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset)) assertEquals(None, reopenedLog.firstUnstableOffset.map(_.messageOffset))
} }
@Test
def testOffsetSnapshot() {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)
val log = createLog(logDir, logConfig)
// append a few records
appendAsFollower(log, MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord("a".getBytes),
new SimpleRecord("b".getBytes),
new SimpleRecord("c".getBytes)), 5)
log.highWatermark = 2L
var offsets: LogOffsetSnapshot = log.offsetSnapshot
assertEquals(offsets.highWatermark.messageOffset, 2L)
assertFalse(offsets.highWatermark.messageOffsetOnly)
offsets = log.offsetSnapshot
assertEquals(offsets.highWatermark.messageOffset, 2L)
assertFalse(offsets.highWatermark.messageOffsetOnly)
try {
log.highWatermark = 100L
offsets = log.offsetSnapshot
fail("Should have thrown")
} catch {
case e: OffsetOutOfRangeException => // pass
case _ => fail("Should have seen OffsetOutOfRangeException")
}
}
@Test @Test
def testLastStableOffsetWithMixedProducerData() { def testLastStableOffsetWithMixedProducerData() {
val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5) val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024 * 5)

View File

@ -205,7 +205,7 @@ class FetchRequestTest extends BaseRequestTest {
Seq(topicPartition))).build() Seq(topicPartition))).build()
val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest) val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest)
val partitionData = fetchResponse.responseData.get(topicPartition) val partitionData = fetchResponse.responseData.get(topicPartition)
assertEquals(Errors.NOT_LEADER_FOR_PARTITION, partitionData.error) assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionData.error)
} }
@Test @Test
@ -238,8 +238,8 @@ class FetchRequestTest extends BaseRequestTest {
// Check follower error codes // Check follower error codes
val followerId = TestUtils.findFollowerId(topicPartition, servers) val followerId = TestUtils.findFollowerId(topicPartition, servers)
assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty()) assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(secondLeaderEpoch)) assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1)) assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1)) assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
} }

View File

@ -50,6 +50,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import EasyMock._ import EasyMock._
import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData} import org.apache.kafka.common.message.{HeartbeatRequestData, JoinGroupRequestData, OffsetCommitRequestData, OffsetCommitResponseData, SyncGroupRequestData}
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
import org.apache.kafka.common.replica.ClientMetadata
import org.junit.Assert.{assertEquals, assertNull, assertTrue} import org.junit.Assert.{assertEquals, assertNull, assertTrue}
import org.junit.{After, Test} import org.junit.{After, Test}
@ -464,14 +465,15 @@ class KafkaApisTest {
replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean, replicaManager.fetchMessages(anyLong, anyInt, anyInt, anyInt, anyBoolean,
anyObject[Seq[(TopicPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota], anyObject[Seq[(TopicPartition, FetchRequest.PartitionData)]], anyObject[ReplicaQuota],
anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel]) anyObject[Seq[(TopicPartition, FetchPartitionData)] => Unit](), anyObject[IsolationLevel],
anyObject[Option[ClientMetadata]])
expectLastCall[Unit].andAnswer(new IAnswer[Unit] { expectLastCall[Unit].andAnswer(new IAnswer[Unit] {
def answer: Unit = { def answer: Unit = {
val callback = getCurrentArguments.apply(7).asInstanceOf[(Seq[(TopicPartition, FetchPartitionData)] => Unit)] val callback = getCurrentArguments.apply(7).asInstanceOf[(Seq[(TopicPartition, FetchPartitionData)] => Unit)]
val records = MemoryRecords.withRecords(CompressionType.NONE, val records = MemoryRecords.withRecords(CompressionType.NONE,
new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8))) new SimpleRecord(timestamp, "foo".getBytes(StandardCharsets.UTF_8)))
callback(Seq(tp -> new FetchPartitionData(Errors.NONE, hw, 0, records, callback(Seq(tp -> new FetchPartitionData(Errors.NONE, hw, 0, records,
None, None))) None, None, Option.empty)))
} }
}) })

View File

@ -655,6 +655,7 @@ class KafkaConfigTest {
case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaFetchResponseMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaSelectorClassProp => // Ignore string
case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")

View File

@ -395,6 +395,7 @@ class ReplicaAlterLogDirsThreadTest {
EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.capture(responseCallback), EasyMock.capture(responseCallback),
EasyMock.anyObject(),
EasyMock.anyObject())) EasyMock.anyObject()))
.andAnswer(new IAnswer[Unit] { .andAnswer(new IAnswer[Unit] {
override def answer(): Unit = { override def answer(): Unit = {
@ -629,6 +630,7 @@ class ReplicaAlterLogDirsThreadTest {
EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.capture(responseCallback), EasyMock.capture(responseCallback),
EasyMock.anyObject(),
EasyMock.anyObject())) EasyMock.anyObject()))
.andAnswer(new IAnswer[Unit] { .andAnswer(new IAnswer[Unit] {
override def answer(): Unit = { override def answer(): Unit = {

View File

@ -64,7 +64,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue, fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false, hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo, readPartitionInfo = fetchInfo,
quota = quota) quota = quota,
clientMetadata = None)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1, assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
@ -89,7 +90,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue, fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false, hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo, readPartitionInfo = fetchInfo,
quota = quota) quota = quota,
clientMetadata = None)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0, assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
assertEquals("Given two partitions, with both throttled, we should get no messages", 0, assertEquals("Given two partitions, with both throttled, we should get no messages", 0,
@ -113,7 +115,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue, fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false, hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo, readPartitionInfo = fetchInfo,
quota = quota) quota = quota,
clientMetadata = None)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1, assertEquals("Given two partitions, with both non-throttled, we should get both messages", 1,
@ -137,7 +140,8 @@ class ReplicaManagerQuotasTest {
fetchMaxBytes = Int.MaxValue, fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false, hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo, readPartitionInfo = fetchInfo,
quota = quota) quota = quota,
clientMetadata = None)
assertEquals("Given two partitions, with only one throttled, we should get the first", 1, assertEquals("Given two partitions, with only one throttled, we should get the first", 1,
fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size) fetch.find(_._1 == topicPartition1).get._2.info.records.batches.asScala.size)
@ -167,6 +171,7 @@ class ReplicaManagerQuotasTest {
EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[TopicPartition], EasyMock.anyObject[Int])) EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[TopicPartition], EasyMock.anyObject[Int]))
.andReturn(!isReplicaInSync).anyTimes() .andReturn(!isReplicaInSync).anyTimes()
EasyMock.expect(partition.getReplica(1)).andReturn(None)
EasyMock.replay(replicaManager, partition) EasyMock.replay(replicaManager, partition)
val tp = new TopicPartition("t1", 0) val tp = new TopicPartition("t1", 0)
@ -179,9 +184,10 @@ class ReplicaManagerQuotasTest {
fetchIsolation = FetchLogEnd, fetchIsolation = FetchLogEnd,
isFromFollower = true, isFromFollower = true,
replicaId = 1, replicaId = 1,
fetchPartitionStatus = List((tp, fetchPartitionStatus))) fetchPartitionStatus = List((tp, fetchPartitionStatus))
)
new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager, new DelayedFetch(delayMs = 600, fetchMetadata = fetchMetadata, replicaManager = replicaManager,
quota = null, responseCallback = null) { quota = null, clientMetadata = None, responseCallback = null) {
override def forceComplete(): Boolean = true override def forceComplete(): Boolean = true
} }
} }

View File

@ -18,10 +18,13 @@
package kafka.server package kafka.server
import java.io.File import java.io.File
import java.util.{Optional, Properties} import java.net.InetAddress
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Optional, Properties}
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager}
import kafka.utils.{MockScheduler, MockTime, TestUtils} import kafka.utils.{MockScheduler, MockTime, TestUtils}
import TestUtils.createBroker import TestUtils.createBroker
@ -29,16 +32,22 @@ import kafka.cluster.BrokerEndPoint
import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.checkpoints.LazyOffsetCheckpoints import kafka.server.checkpoints.LazyOffsetCheckpoints
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.TestUtils.createBroker
import kafka.utils.timer.MockTimer import kafka.utils.timer.MockTimer
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import kafka.zk.KafkaZkClient import kafka.zk.KafkaZkClient
import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, IsolationLevel, LeaderAndIsrRequest} import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{EpochEndOffset, IsolationLevel, LeaderAndIsrRequest}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.zookeeper.data.Stat import org.apache.zookeeper.data.Stat
@ -179,12 +188,6 @@ class ReplicaManagerTest {
assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error) assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error)
} }
// Fetch some messages
val fetchResult = fetchAsConsumer(rm, new TopicPartition(topic, 0),
new PartitionData(0, 0, 100000, Optional.empty()),
minBytes = 100000)
assertFalse(fetchResult.isFired)
// Make this replica the follower // Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) -> collection.immutable.Map(new TopicPartition(topic, 0) ->
@ -193,7 +196,6 @@ class ReplicaManagerTest {
rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
assertTrue(appendResult.isFired) assertTrue(appendResult.isFired)
assertTrue(fetchResult.isFired)
} finally { } finally {
rm.shutdown(checkpointHW = false) rm.shutdown(checkpointHW = false)
} }
@ -515,7 +517,8 @@ class ReplicaManagerTest {
fetchInfos = Seq(tp -> validFetchPartitionData), fetchInfos = Seq(tp -> validFetchPartitionData),
quota = UnboundedQuota, quota = UnboundedQuota,
isolationLevel = IsolationLevel.READ_UNCOMMITTED, isolationLevel = IsolationLevel.READ_UNCOMMITTED,
responseCallback = callback responseCallback = callback,
clientMetadata = None
) )
assertTrue(successfulFetch.isDefined) assertTrue(successfulFetch.isDefined)
@ -537,7 +540,8 @@ class ReplicaManagerTest {
fetchInfos = Seq(tp -> invalidFetchPartitionData), fetchInfos = Seq(tp -> invalidFetchPartitionData),
quota = UnboundedQuota, quota = UnboundedQuota,
isolationLevel = IsolationLevel.READ_UNCOMMITTED, isolationLevel = IsolationLevel.READ_UNCOMMITTED,
responseCallback = callback responseCallback = callback,
clientMetadata = None
) )
assertTrue(successfulFetch.isDefined) assertTrue(successfulFetch.isDefined)
@ -617,7 +621,8 @@ class ReplicaManagerTest {
tp1 -> new PartitionData(1, 0, 100000, Optional.empty())), tp1 -> new PartitionData(1, 0, 100000, Optional.empty())),
quota = UnboundedQuota, quota = UnboundedQuota,
responseCallback = fetchCallback, responseCallback = fetchCallback,
isolationLevel = IsolationLevel.READ_UNCOMMITTED isolationLevel = IsolationLevel.READ_UNCOMMITTED,
clientMetadata = None
) )
val tp0Log = replicaManager.localLog(tp0) val tp0Log = replicaManager.localLog(tp0)
assertTrue(tp0Log.isDefined) assertTrue(tp0Log.isDefined)
@ -678,6 +683,157 @@ class ReplicaManagerTest {
EasyMock.verify(mockLogMgr) EasyMock.verify(mockLogMgr)
} }
@Test
def testReplicaSelector(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val controllerId = 0
val leaderEpoch = 1
val leaderEpochIncrement = 2
val aliveBrokerIds = Seq[Integer] (followerBrokerId, leaderBrokerId)
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
val partition = replicaManager.createPartition(new TopicPartition(topic, topicPartition))
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
partition.createLogIfNotExists(leaderBrokerId, isNew = false, isFutureReplica = false, offsetCheckpoints)
partition.makeLeader(
controllerId,
leaderAndIsrPartitionState(leaderEpoch, leaderBrokerId, aliveBrokerIds),
correlationId,
offsetCheckpoints
)
val tp0 = new TopicPartition(topic, 0)
val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
// We expect to select the leader, which means we return None
val preferredReadReplica: Option[Int] = replicaManager.findPreferredReadReplica(
tp0, metadata, Request.OrdinaryConsumerId, 1L, System.currentTimeMillis)
assertFalse(preferredReadReplica.isDefined)
}
@Test
def testPreferredReplicaAsFollower(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0)
replicaManager.createPartition(new TopicPartition(topic, 0))
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) ->
new LeaderAndIsrRequest.PartitionState(0, 1, 1, brokerList, 0, brokerList, false)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
val consumerResult = fetchAsConsumer(replicaManager, tp0,
new PartitionData(0, 0, 100000, Optional.empty()),
clientMetadata = Some(metadata))
// Fetch from follower succeeds
assertTrue(consumerResult.isFired)
// But only leader will compute preferred replica
assertTrue(consumerResult.assertFired.preferredReadReplica.isEmpty)
}
@Test
def testPreferredReplicaAsLeader(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
val brokerList = Seq[Integer](0, 1).asJava
val tp0 = new TopicPartition(topic, 0)
replicaManager.createPartition(new TopicPartition(topic, 0))
// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
collection.immutable.Map(new TopicPartition(topic, 0) ->
new LeaderAndIsrRequest.PartitionState(0, 0, 1, brokerList, 0, brokerList, false)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", "client-id",
InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, "default")
val consumerResult = fetchAsConsumer(replicaManager, tp0,
new PartitionData(0, 0, 100000, Optional.empty()),
clientMetadata = Some(metadata))
// Fetch from follower succeeds
assertTrue(consumerResult.isFired)
// Returns a preferred replica (should just be the leader, which is None)
assertFalse(consumerResult.assertFired.preferredReadReplica.isDefined)
}
@Test(expected = classOf[ClassNotFoundException])
def testUnknownReplicaSelector(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
val props = new Properties()
props.put(KafkaConfig.ReplicaSelectorClassProp, "non-a-class")
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true, extraProps = props)
}
@Test
def testDefaultReplicaSelector(): Unit = {
val topicPartition = 0
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
leaderBrokerId, countDownLatch, expectTruncation = true)
assertFalse(replicaManager.replicaSelectorOpt.isDefined)
}
/** /**
* This method assumes that the test using created ReplicaManager calls * This method assumes that the test using created ReplicaManager calls
* ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing * ReplicaManager.becomeLeaderOrFollower() once with LeaderAndIsrRequest containing
@ -688,9 +844,11 @@ class ReplicaManagerTest {
followerBrokerId: Int, followerBrokerId: Int,
leaderBrokerId: Int, leaderBrokerId: Int,
countDownLatch: CountDownLatch, countDownLatch: CountDownLatch,
expectTruncation: Boolean) : (ReplicaManager, LogManager) = { expectTruncation: Boolean,
extraProps: Properties = new Properties()) : (ReplicaManager, LogManager) = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
props.asScala ++= extraProps.asScala
val config = KafkaConfig.fromProps(props) val config = KafkaConfig.fromProps(props)
// Setup mock local log to have leader epoch of 3 and offset of 10 // Setup mock local log to have leader epoch of 3 and offset of 10
@ -749,9 +907,16 @@ class ReplicaManagerTest {
.andReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId))) .andReturn(Option(createBroker(brokerId, s"host$brokerId", brokerId)))
.anyTimes .anyTimes
} }
EasyMock
.expect(metadataCache.getPartitionReplicaEndpoints(
EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(Map(
leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"),
followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap
)
.anyTimes()
EasyMock.replay(metadataCache) EasyMock.replay(metadataCache)
val timer = new MockTimer val timer = new MockTimer
val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", timer, reaperEnabled = false) purgatoryName = "Produce", timer, reaperEnabled = false)
@ -860,16 +1025,18 @@ class ReplicaManagerTest {
partition: TopicPartition, partition: TopicPartition,
partitionData: PartitionData, partitionData: PartitionData,
minBytes: Int = 0, minBytes: Int = 0,
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = { isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel) clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = {
fetchMessages(replicaManager, replicaId = -1, partition, partitionData, minBytes, isolationLevel, clientMetadata)
} }
private def fetchAsFollower(replicaManager: ReplicaManager, private def fetchAsFollower(replicaManager: ReplicaManager,
partition: TopicPartition, partition: TopicPartition,
partitionData: PartitionData, partitionData: PartitionData,
minBytes: Int = 0, minBytes: Int = 0,
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED): CallbackResult[FetchPartitionData] = { isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel) clientMetadata: Option[ClientMetadata] = None): CallbackResult[FetchPartitionData] = {
fetchMessages(replicaManager, replicaId = 1, partition, partitionData, minBytes, isolationLevel, clientMetadata)
} }
private def fetchMessages(replicaManager: ReplicaManager, private def fetchMessages(replicaManager: ReplicaManager,
@ -877,7 +1044,8 @@ class ReplicaManagerTest {
partition: TopicPartition, partition: TopicPartition,
partitionData: PartitionData, partitionData: PartitionData,
minBytes: Int, minBytes: Int,
isolationLevel: IsolationLevel): CallbackResult[FetchPartitionData] = { isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]): CallbackResult[FetchPartitionData] = {
val result = new CallbackResult[FetchPartitionData]() val result = new CallbackResult[FetchPartitionData]()
def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = { def fetchCallback(responseStatus: Seq[(TopicPartition, FetchPartitionData)]) = {
assertEquals(1, responseStatus.size) assertEquals(1, responseStatus.size)
@ -895,7 +1063,9 @@ class ReplicaManagerTest {
fetchInfos = Seq(partition -> partitionData), fetchInfos = Seq(partition -> partitionData),
quota = UnboundedQuota, quota = UnboundedQuota,
responseCallback = fetchCallback, responseCallback = fetchCallback,
isolationLevel = isolationLevel) isolationLevel = isolationLevel,
clientMetadata = clientMetadata
)
result result
} }

View File

@ -178,7 +178,8 @@ class SimpleFetchTest {
fetchMaxBytes = Int.MaxValue, fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false, hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo, readPartitionInfo = fetchInfo,
quota = UnboundedQuota).find(_._1 == topicPartition) quota = UnboundedQuota,
clientMetadata = None).find(_._1 == topicPartition)
val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next() val firstReadRecord = readCommittedRecords.get._2.info.records.records.iterator.next()
assertEquals("Reading committed data should return messages only up to high watermark", recordToHW, assertEquals("Reading committed data should return messages only up to high watermark", recordToHW,
new SimpleRecord(firstReadRecord)) new SimpleRecord(firstReadRecord))
@ -190,7 +191,8 @@ class SimpleFetchTest {
fetchMaxBytes = Int.MaxValue, fetchMaxBytes = Int.MaxValue,
hardMaxBytesLimit = false, hardMaxBytesLimit = false,
readPartitionInfo = fetchInfo, readPartitionInfo = fetchInfo,
quota = UnboundedQuota).find(_._1 == topicPartition) quota = UnboundedQuota,
clientMetadata = None).find(_._1 == topicPartition)
val firstRecord = readAllRecords.get._2.info.records.records.iterator.next() val firstRecord = readAllRecords.get._2.info.records.records.iterator.next()
assertEquals("Reading any data can return messages up to the end of the log", recordToLEO, assertEquals("Reading any data can return messages up to the end of the log", recordToLEO,

View File

@ -86,6 +86,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
jaas_override_variables A dict of variables to be used in the jaas.conf template file jaas_override_variables A dict of variables to be used in the jaas.conf template file
kafka_opts_override Override parameters of the KAFKA_OPTS environment variable kafka_opts_override Override parameters of the KAFKA_OPTS environment variable
client_prop_file_override Override client.properties file used by the consumer client_prop_file_override Override client.properties file used by the consumer
consumer_properties A dict of values to pass in as --consumer-property key=value
""" """
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=ConsoleConsumer.PERSISTENT_ROOT) root=ConsoleConsumer.PERSISTENT_ROOT)
@ -208,7 +209,7 @@ class ConsoleConsumer(KafkaPathResolverMixin, JmxMixin, BackgroundThreadService)
if self.consumer_properties is not None: if self.consumer_properties is not None:
for k, v in self.consumer_properties.items(): for k, v in self.consumer_properties.items():
cmd += "--consumer_properties %s=%s" % (k, v) cmd += " --consumer-property %s=%s" % (k, v)
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd return cmd

View File

@ -94,7 +94,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None, authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None,
jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None, jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
listener_security_config=ListenerSecurityConfig()): listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides={}):
""" """
:param context: test context :param context: test context
:param ZookeeperService zk: :param ZookeeperService zk:
@ -129,6 +129,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
self.server_prop_overides = [] self.server_prop_overides = []
else: else:
self.server_prop_overides = server_prop_overides self.server_prop_overides = server_prop_overides
if per_node_server_prop_overrides is None:
self.per_node_server_prop_overrides = {}
else:
self.per_node_server_prop_overrides = per_node_server_prop_overrides
self.log_level = "DEBUG" self.log_level = "DEBUG"
self.zk_chroot = zk_chroot self.zk_chroot = zk_chroot
self.listener_security_config = listener_security_config self.listener_security_config = listener_security_config
@ -295,6 +299,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
for prop in self.server_prop_overides: for prop in self.server_prop_overides:
override_configs[prop[0]] = prop[1] override_configs[prop[0]] = prop[1]
for prop in self.per_node_server_prop_overrides.get(self.idx(node), []):
override_configs[prop[0]] = prop[1]
#update template configs with test override configs #update template configs with test override configs
configs.update(override_configs) configs.update(override_configs)

View File

@ -27,9 +27,10 @@ class JmxMixin(object):
- we assume the service using JmxMixin also uses KafkaPathResolverMixin - we assume the service using JmxMixin also uses KafkaPathResolverMixin
- this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted - this uses the --wait option for JmxTool, so the list of object names must be explicit; no patterns are permitted
""" """
def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, root="/mnt"): def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=None, jmx_poll_ms=1000, root="/mnt"):
self.jmx_object_names = jmx_object_names self.jmx_object_names = jmx_object_names
self.jmx_attributes = jmx_attributes or [] self.jmx_attributes = jmx_attributes or []
self.jmx_poll_ms = jmx_poll_ms
self.jmx_port = 9192 self.jmx_port = 9192
self.started = [False] * num_nodes self.started = [False] * num_nodes
@ -71,7 +72,7 @@ class JmxMixin(object):
if use_jmxtool_version <= V_0_11_0_0: if use_jmxtool_version <= V_0_11_0_0:
use_jmxtool_version = DEV_BRANCH use_jmxtool_version = DEV_BRANCH
cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), self.jmx_class_name()) cmd = "%s %s " % (self.path.script("kafka-run-class.sh", use_jmxtool_version), self.jmx_class_name())
cmd += "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port cmd += "--reporting-interval %d --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (self.jmx_poll_ms, self.jmx_port)
cmd += " --wait" cmd += " --wait"
for jmx_object_name in self.jmx_object_names: for jmx_object_name in self.jmx_object_names:
cmd += " --object-name %s" % jmx_object_name cmd += " --object-name %s" % jmx_object_name
@ -83,7 +84,7 @@ class JmxMixin(object):
self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd)) self.logger.debug("%s: Start JmxTool %d command: %s" % (node.account, idx, cmd))
node.account.ssh(cmd, allow_fail=False) node.account.ssh(cmd, allow_fail=False)
wait_until(lambda: self._jmx_has_output(node), timeout_sec=20, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account) wait_until(lambda: self._jmx_has_output(node), timeout_sec=30, backoff_sec=.5, err_msg="%s: Jmx tool took too long to start" % node.account)
self.started[idx-1] = True self.started[idx-1] = True
def _jmx_has_output(self, node): def _jmx_has_output(self, node):

View File

@ -21,7 +21,6 @@ from kafkatest.services.kafka import TopicPartition
from kafkatest.services.verifiable_consumer import VerifiableConsumer from kafkatest.services.verifiable_consumer import VerifiableConsumer
class TruncationTest(VerifiableConsumerTest): class TruncationTest(VerifiableConsumerTest):
TOPIC = "test_topic" TOPIC = "test_topic"
NUM_PARTITIONS = 1 NUM_PARTITIONS = 1