KAFKA-1215; Rack-Aware replica assignment option

Please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment for the overall design.

The update to TopicMetadataRequest/TopicMetadataResponse will be done in a different PR.

Author: Allen Wang <awang@netflix.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, 	Grant Henke <granthenke@gmail.com>, Jun Rao <junrao@gmail.com>

Closes #132 from allenxwang/KAFKA-1215
This commit is contained in:
Allen Wang 2016-03-15 10:03:03 -07:00 committed by Jun Rao
parent deb2b004cb
commit 951e30adc6
27 changed files with 1003 additions and 253 deletions

View File

@ -697,8 +697,26 @@ public class Protocol {
public static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0;
public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1};
public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1};
public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V2 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V1;
public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V2 = UPDATE_METADATA_REQUEST_END_POINT_V1;
public static final Schema UPDATE_METADATA_REQUEST_BROKER_V2 =
new Schema(new Field("id", INT32, "The broker id."),
new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V2)),
new Field("rack", NULLABLE_STRING, "The rack"));
public static final Schema UPDATE_METADATA_REQUEST_V2 =
new Schema(new Field("controller_id", INT32, "The controller id."),
new Field("controller_epoch", INT32, "The controller epoch."),
new Field("partition_states", new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V2)),
new Field("live_brokers", new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V2)));
public static final Schema UPDATE_METADATA_RESPONSE_V2 = UPDATE_METADATA_RESPONSE_V1;
public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, UPDATE_METADATA_REQUEST_V2};
public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, UPDATE_METADATA_RESPONSE_V2};
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */

View File

@ -311,7 +311,10 @@ public class Struct {
for (Object arrayItem: arrayObject)
result = prime * result + arrayItem.hashCode();
} else {
result = prime * result + this.get(f).hashCode();
Object field = this.get(f);
if (field != null) {
result = prime * result + field.hashCode();
}
}
}
return result;
@ -330,11 +333,13 @@ public class Struct {
return false;
for (int i = 0; i < this.values.length; i++) {
Field f = this.schema.get(i);
Boolean result;
boolean result;
if (f.type() instanceof ArrayOf) {
result = Arrays.equals((Object []) this.get(f), (Object []) other.get(f));
result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
} else {
result = this.get(f).equals(other.get(f));
Object thisField = this.get(f);
Object otherField = other.get(f);
result = (thisField == null && otherField == null) || thisField.equals(otherField);
}
if (!result)
return false;

View File

@ -49,16 +49,22 @@ public class UpdateMetadataRequest extends AbstractRequest {
this.zkVersion = zkVersion;
this.replicas = replicas;
}
}
public static final class Broker {
public final int id;
public final Map<SecurityProtocol, EndPoint> endPoints;
public final String rack;
public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints, String rack) {
this.id = id;
this.endPoints = endPoints;
this.rack = rack;
}
@Deprecated
public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
this(id, endPoints, null);
}
}
@ -91,6 +97,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
// Broker key names
private static final String BROKER_ID_KEY_NAME = "id";
private static final String ENDPOINTS_KEY_NAME = "end_points";
private static final String RACK_KEY_NAME = "rack";
// EndPoint key names
private static final String HOST_KEY_NAME = "host";
@ -117,20 +124,20 @@ public class UpdateMetadataRequest extends AbstractRequest {
for (BrokerEndPoint brokerEndPoint : brokerEndPoints) {
Map<SecurityProtocol, EndPoint> endPoints = Collections.singletonMap(SecurityProtocol.PLAINTEXT,
new EndPoint(brokerEndPoint.host(), brokerEndPoint.port()));
brokers.add(new Broker(brokerEndPoint.id(), endPoints));
brokers.add(new Broker(brokerEndPoint.id(), endPoints, null));
}
return brokers;
}
/**
* Constructor for version 1.
* Constructor for version 2.
*/
public UpdateMetadataRequest(int controllerId, int controllerEpoch, Map<TopicPartition,
PartitionState> partitionStates, Set<Broker> liveBrokers) {
this(1, controllerId, controllerEpoch, partitionStates, liveBrokers);
this(2, controllerId, controllerEpoch, partitionStates, liveBrokers);
}
private UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
public UpdateMetadataRequest(int version, int controllerId, int controllerEpoch, Map<TopicPartition,
PartitionState> partitionStates, Set<Broker> liveBrokers) {
super(new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)));
struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
@ -173,6 +180,9 @@ public class UpdateMetadataRequest extends AbstractRequest {
}
brokerData.set(ENDPOINTS_KEY_NAME, endPointsData.toArray());
if (version >= 2) {
brokerData.set(RACK_KEY_NAME, broker.rack);
}
}
brokersData.add(brokerData);
@ -226,8 +236,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
int port = brokerData.getInt(PORT_KEY_NAME);
Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(1);
endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, port));
liveBrokers.add(new Broker(brokerId, endPoints));
} else { // V1
liveBrokers.add(new Broker(brokerId, endPoints, null));
} else { // V1 or V2
Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>();
for (Object endPointDataObj : brokerData.getArray(ENDPOINTS_KEY_NAME)) {
Struct endPointData = (Struct) endPointDataObj;
@ -236,11 +246,13 @@ public class UpdateMetadataRequest extends AbstractRequest {
short protocolTypeId = endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME);
endPoints.put(SecurityProtocol.forId(protocolTypeId), new EndPoint(host, port));
}
liveBrokers.add(new Broker(brokerId, endPoints));
String rack = null;
if (brokerData.hasField(RACK_KEY_NAME)) { // V2
rack = brokerData.getString(RACK_KEY_NAME);
}
liveBrokers.add(new Broker(brokerId, endPoints, rack));
}
}
controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME);
this.partitionStates = partitionStates;
@ -249,15 +261,12 @@ public class UpdateMetadataRequest extends AbstractRequest {
@Override
public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
case 1:
if (versionId <= 2)
return new UpdateMetadataResponse(Errors.forException(e).code());
default:
else
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
}
}
public int controllerId() {
return controllerId;

View File

@ -86,8 +86,9 @@ public class RequestResponseTest {
createStopReplicaRequest(),
createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()),
createStopReplicaResponse(),
createUpdateMetadataRequest(1),
createUpdateMetadataRequest(1).getErrorResponse(1, new UnknownServerException()),
createUpdateMetadataRequest(2, "rack1"),
createUpdateMetadataRequest(2, null),
createUpdateMetadataRequest(2, "rack1").getErrorResponse(2, new UnknownServerException()),
createUpdateMetadataResponse(),
createLeaderAndIsrRequest(),
createLeaderAndIsrRequest().getErrorResponse(0, new UnknownServerException()),
@ -97,8 +98,11 @@ public class RequestResponseTest {
for (AbstractRequestResponse req : requestResponseList)
checkSerialization(req, null);
checkSerialization(createUpdateMetadataRequest(0), 0);
checkSerialization(createUpdateMetadataRequest(0).getErrorResponse(0, new UnknownServerException()), 0);
checkSerialization(createUpdateMetadataRequest(0, null), 0);
checkSerialization(createUpdateMetadataRequest(0, null).getErrorResponse(0, new UnknownServerException()), 0);
checkSerialization(createUpdateMetadataRequest(1, null), 1);
checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1);
checkSerialization(createUpdateMetadataRequest(1, null).getErrorResponse(1, new UnknownServerException()), 1);
}
private void checkSerialization(AbstractRequestResponse req, Integer version) throws Exception {
@ -120,7 +124,7 @@ public class RequestResponseTest {
@Test
public void produceResponseVersionTest() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
ProduceResponse v0Response = new ProduceResponse(responseData);
ProduceResponse v1Response = new ProduceResponse(responseData, 10, 1);
@ -138,7 +142,7 @@ public class RequestResponseTest {
@Test
public void fetchResponseVersionTest() {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
FetchResponse v0Response = new FetchResponse(responseData);
@ -192,14 +196,14 @@ public class RequestResponseTest {
}
private AbstractRequest createFetchRequest() {
Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<TopicPartition, FetchRequest.PartitionData>();
Map<TopicPartition, FetchRequest.PartitionData> fetchData = new HashMap<>();
fetchData.put(new TopicPartition("test1", 0), new FetchRequest.PartitionData(100, 1000000));
fetchData.put(new TopicPartition("test2", 0), new FetchRequest.PartitionData(200, 1000000));
return new FetchRequest(-1, 100, 100000, fetchData);
}
private AbstractRequestResponse createFetchResponse() {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE.code(), 1000000, ByteBuffer.allocate(10)));
return new FetchResponse(responseData, 0);
}
@ -259,13 +263,13 @@ public class RequestResponseTest {
}
private AbstractRequest createListOffsetRequest() {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>();
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = new HashMap<>();
offsetData.put(new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10));
return new ListOffsetRequest(-1, offsetData);
}
private AbstractRequestResponse createListOffsetResponse() {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ListOffsetResponse.PartitionData(Errors.NONE.code(), Arrays.asList(100L)));
return new ListOffsetResponse(responseData);
}
@ -289,13 +293,13 @@ public class RequestResponseTest {
}
private AbstractRequest createOffsetCommitRequest() {
Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>();
Map<TopicPartition, OffsetCommitRequest.PartitionData> commitData = new HashMap<>();
commitData.put(new TopicPartition("test", 0), new OffsetCommitRequest.PartitionData(100, ""));
return new OffsetCommitRequest("group1", 100, "consumer1", 1000000, commitData);
}
private AbstractRequestResponse createOffsetCommitResponse() {
Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
Map<TopicPartition, Short> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), Errors.NONE.code());
return new OffsetCommitResponse(responseData);
}
@ -305,19 +309,19 @@ public class RequestResponseTest {
}
private AbstractRequestResponse createOffsetFetchResponse() {
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new OffsetFetchResponse.PartitionData(100L, "", Errors.NONE.code()));
return new OffsetFetchResponse(responseData);
}
private AbstractRequest createProduceRequest() {
Map<TopicPartition, ByteBuffer> produceData = new HashMap<TopicPartition, ByteBuffer>();
Map<TopicPartition, ByteBuffer> produceData = new HashMap<>();
produceData.put(new TopicPartition("test", 0), ByteBuffer.allocate(10));
return new ProduceRequest((short) 1, 5000, produceData);
}
private AbstractRequestResponse createProduceResponse() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE.code(), 10000, Record.NO_TIMESTAMP));
return new ProduceResponse(responseData, 0);
}
@ -371,7 +375,7 @@ public class RequestResponseTest {
}
@SuppressWarnings("deprecation")
private AbstractRequest createUpdateMetadataRequest(int version) {
private AbstractRequest createUpdateMetadataRequest(int version, String rack) {
Map<TopicPartition, UpdateMetadataRequest.PartitionState> partitionStates = new HashMap<>();
List<Integer> isr = Arrays.asList(1, 2);
List<Integer> replicas = Arrays.asList(1, 2, 3, 4);
@ -397,11 +401,10 @@ public class RequestResponseTest {
endPoints2.put(SecurityProtocol.PLAINTEXT, new UpdateMetadataRequest.EndPoint("host1", 1244));
endPoints2.put(SecurityProtocol.SSL, new UpdateMetadataRequest.EndPoint("host2", 1234));
Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1),
new UpdateMetadataRequest.Broker(1, endPoints2)
Set<UpdateMetadataRequest.Broker> liveBrokers = new HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack),
new UpdateMetadataRequest.Broker(1, endPoints2, rack)
));
return new UpdateMetadataRequest(1, 10, partitionStates, liveBrokers);
return new UpdateMetadataRequest(version, 1, 10, partitionStates, liveBrokers);
}
}

View File

@ -19,7 +19,6 @@ package kafka.admin
import kafka.common._
import kafka.cluster.Broker
import kafka.log.LogConfig
import kafka.server.ConfigType
import kafka.utils._
@ -32,14 +31,12 @@ import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopi
import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
import org.apache.kafka.common.requests.MetadataResponse
import scala.Predef._
import scala.collection._
import scala.collection.JavaConverters._
import scala.collection.mutable
import JavaConverters._
import mutable.ListBuffer
import scala.collection.mutable
import collection.Map
import collection.Set
import org.I0Itec.zkclient.exception.ZkNodeExistsException
object AdminUtils extends Logging {
@ -48,11 +45,13 @@ object AdminUtils extends Logging {
val EntityConfigChangeZnodePrefix = "config_change_"
/**
* There are 2 goals of replica assignment:
* There are 3 goals of replica assignment:
*
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
* 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
*
* To achieve this goal, we:
* To achieve this goal for replica assignment without considering racks, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
*
@ -64,39 +63,177 @@ object AdminUtils extends Logging {
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
*
* To create rack aware assignment, this API will first create a rack alternated broker list. For example,
* from this brokerID -> rack mapping:
*
* 0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1"
*
* The rack alternated list will be:
*
* 0, 3, 1, 5, 4, 2
*
* Then an easy round-robin assignment can be applied. Assume 6 partitions with replication factor of 3, the assignment
* will be:
*
* 0 -> 0,3,1
* 1 -> 3,1,5
* 2 -> 1,5,4
* 3 -> 5,4,2
* 4 -> 4,2,0
* 5 -> 2,0,3
*
* Once it has completed the first round-robin, if there are more partitions to assign, the algorithm will start
* shifting the followers. This is to ensure we will not always get the same set of sequences.
* In this case, if there is another partition to assign (partition #6), the assignment will be:
*
* 6 -> 0,4,2 (instead of repeating 0,3,1 as partition 0)
*
* The rack aware assignment always chooses the 1st replica of the partition using round robin on the rack alternated
* broker list. For rest of the replicas, it will be biased towards brokers on racks that do not have
* any replica assignment, until every rack has a replica. Then the assignment will go back to round-robin on
* the broker list.
*
* As the result, if the number of replicas is equal to or greater than the number of racks, it will ensure that
* each rack will get at least one replica. Otherwise, each rack will get at most one replica. In a perfect
* situation where the number of replicas is the same as the number of racks and each rack has the same number of
* brokers, it guarantees that the replica distribution is even across brokers and racks.
*
* @return a Map from partition id to replica ids
* @throws AdminOperationException If rack information is supplied but it is incomplete, or if it is not possible to
* assign each replica to a unique rack.
*
*/
def assignReplicasToBrokers(brokerList: Seq[Int],
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1)
: Map[Int, Seq[Int]] = {
startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
if (nPartitions <= 0)
throw new AdminOperationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
throw new AdminOperationException("replication factor must be larger than 0")
if (replicationFactor > brokerList.size)
throw new AdminOperationException("replication factor: " + replicationFactor +
" larger than available brokers: " + brokerList.size)
val ret = new mutable.HashMap[Int, List[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
if (replicationFactor > brokerMetadatas.size)
throw new AdminOperationException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
if (brokerMetadatas.forall(_.rack.isEmpty))
assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
startPartitionId)
else {
if (brokerMetadatas.exists(_.rack.isEmpty))
throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
startPartitionId)
}
}
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
var replicaList = List(brokerList(firstReplicaIndex))
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
ret.put(currentPartitionId, replicaList.reverse)
currentPartitionId = currentPartitionId + 1
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret.toMap
ret
}
private def assignReplicasToBrokersRackAware(nPartitions: Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val brokerRackMap = brokerMetadatas.collect { case BrokerMetadata(id, Some(rack)) =>
id -> rack
}.toMap
val numRacks = brokerRackMap.values.toSet.size
val arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap)
val numBrokers = arrangedBrokerList.size
val ret = mutable.Map[Int, Seq[Int]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
val leader = arrangedBrokerList(firstReplicaIndex)
val replicaBuffer = mutable.ArrayBuffer(leader)
val racksWithReplicas = mutable.Set(brokerRackMap(leader))
val brokersWithReplicas = mutable.Set(leader)
var k = 0
for (_ <- 0 until replicationFactor - 1) {
var done = false
while (!done) {
val broker = arrangedBrokerList(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size))
val rack = brokerRackMap(broker)
// Skip this broker if
// 1. there is already a broker in the same rack that has assigned a replica AND there is one or more racks
// that do not have any replica, or
// 2. the broker has already assigned a replica AND there is one or more brokers that do not have replica assigned
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
}
}
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
/**
* Given broker and rack information, returns a list of brokers alternated by the rack. Assume
* this is the rack and its brokers:
*
* rack1: 0, 1, 2
* rack2: 3, 4, 5
* rack3: 6, 7, 8
*
* This API would return the list of 0, 3, 6, 1, 4, 7, 2, 5, 8
*
* This is essential to make sure that the assignReplicasToBrokers API can use such list and
* assign replicas to brokers in a simple round-robin fashion, while ensuring an even
* distribution of leader and replica counts on each broker and that replicas are
* distributed to all racks.
*/
private[admin] def getRackAlternatedBrokerList(brokerRackMap: Map[Int, String]): IndexedSeq[Int] = {
val brokersIteratorByRack = getInverseMap(brokerRackMap).map { case (rack, brokers) =>
(rack, brokers.toIterator)
}
val racks = brokersIteratorByRack.keys.toArray.sorted
val result = new mutable.ArrayBuffer[Int]
var rackIndex = 0
while (result.size < brokerRackMap.size) {
val rackIterator = brokersIteratorByRack(racks(rackIndex))
if (rackIterator.hasNext)
result += rackIterator.next()
rackIndex = (rackIndex + 1) % racks.length
}
result
}
private[admin] def getInverseMap(brokerRackMap: Map[Int, String]): Map[String, Seq[Int]] = {
brokerRackMap.toSeq.map { case (id, rack) => (rack, id) }
.groupBy { case (rack, _) => rack }
.map { case (rack, rackAndIdList) => (rack, rackAndIdList.map { case (_, id) => id }.sorted) }
}
/**
* Add partitions to existing topic with optional replica assignment
*
@ -110,7 +247,8 @@ object AdminUtils extends Logging {
topic: String,
numPartitions: Int = 1,
replicaAssignmentStr: String = "",
checkBrokerAvailable: Boolean = true) {
checkBrokerAvailable: Boolean = true,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
if (existingPartitionsReplicaList.size == 0)
throw new AdminOperationException("The topic %s does not exist".format(topic))
@ -124,16 +262,16 @@ object AdminUtils extends Logging {
throw new AdminOperationException("The number of partitions for a topic can only be increased")
// create the new partition replication list
val brokerList = zkUtils.getSortedBrokerList()
val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
var startIndex = brokerList.indexWhere(_ >= existingReplicaListForPartitionZero.head)
if(startIndex < 0) {
startIndex = 0
}
AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaListForPartitionZero.size, startIndex, existingPartitionsReplicaList.size)
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
val newPartitionReplicaList =
if (replicaAssignmentStr == null || replicaAssignmentStr == "") {
val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingReplicaListForPartitionZero.head))
AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingReplicaListForPartitionZero.size,
startIndex, existingPartitionsReplicaList.size)
}
else
getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable)
getManualReplicaAssignment(replicaAssignmentStr, brokerMetadatas.map(_.id).toSet,
existingPartitionsReplicaList.size, checkBrokerAvailable)
// check if manual assignment has the right replication factor
val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaListForPartitionZero.size))
@ -237,13 +375,32 @@ object AdminUtils extends Logging {
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.zkClient.exists(getTopicPath(topic))
def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
val allBrokers = zkUtils.getAllBrokersInCluster()
val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
val brokersWithRack = brokers.filter(_.rack.nonEmpty)
if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
" to make replica assignment without rack information.")
}
val brokerMetadatas = rackAwareMode match {
case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
brokers.map(broker => BrokerMetadata(broker.id, None))
case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
}
brokerMetadatas.sortBy(_.id)
}
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties) {
val brokerList = zkUtils.getSortedBrokerList()
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
@ -304,6 +461,7 @@ object AdminUtils extends Logging {
/**
* Update the config for a client and create a change notification so the change will propagate to other brokers
*
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param clientId: The clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
@ -316,6 +474,7 @@ object AdminUtils extends Logging {
/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
*
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param topic: The topic for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or

View File

@ -0,0 +1,23 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package kafka.admin
/**
* Broker metadata used by admin tools.
*
* @param id an integer that uniquely identifies this broker
* @param rack the rack of the broker, which is used to in rack aware partition assignment for fault tolerance.
* Examples: "RACK1", "us-east-1d"
*/
case class BrokerMetadata(id: Int, rack: Option[String])

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
/**
* Mode to control how rack aware replica assignment will be executed
*/
object RackAwareMode {
/**
* Ignore all rack information in replica assignment. This is an optional mode used in command line.
*/
case object Disabled extends RackAwareMode
/**
* Assume every broker has rack, or none of the brokers has rack. If only partial brokers have rack, fail fast
* in replica assignment. This is the default mode in command line tools (TopicCommand and ReassignPartitionsCommand).
*/
case object Enforced extends RackAwareMode
/**
* Use rack information if every broker has a rack. Otherwise, fallback to Disabled mode. This is used in auto topic
* creation.
*/
case object Safe extends RackAwareMode
}
sealed trait RackAwareMode

View File

@ -91,23 +91,33 @@ object ReassignPartitionsCommand extends Logging {
if (duplicateReassignments.nonEmpty)
throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
val disableRackAware = opts.options.has(opts.disableRackAware)
val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware)
println("Current partition replica assignment\n\n%s".format(zkUtils.getPartitionReassignmentZkData(currentAssignments)))
println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(proposedAssignments)))
}
def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = {
val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString)
val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
if (duplicateTopicsToReassign.nonEmpty)
throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
val topicPartitionsToReassign = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
val currentAssignment = zkUtils.getReplicaAssignmentForTopics(topicsToReassign)
var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]()
val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic)
groupedByTopic.foreach { topicInfo =>
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size,
topicInfo._2.head._2.size)
partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
val groupedByTopic = currentAssignment.groupBy { case (tp, _) => tp.topic }
val rackAwareMode = if (disableRackAware) RackAwareMode.Disabled else RackAwareMode.Enforced
val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, rackAwareMode, Some(brokerListToReassign))
val partitionsToBeReassigned = mutable.Map[TopicAndPartition, Seq[Int]]()
groupedByTopic.foreach { case (topic, assignment) =>
val (_, replicas) = assignment.head
val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerMetadatas, assignment.size, replicas.size)
partitionsToBeReassigned ++= assignedReplicas.map { case (partition, replicas) =>
(TopicAndPartition(topic, partition) -> replicas)
}
val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic).toSeq)
println("Current partition replica assignment\n\n%s"
.format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
}
(partitionsToBeReassigned, currentAssignment)
}
def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) {
@ -200,6 +210,7 @@ object ReassignPartitionsCommand extends Logging {
.withRequiredArg
.describedAs("brokerlist")
.ofType(classOf[String])
val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.")

View File

@ -105,7 +105,9 @@ object TopicCommand extends Logging {
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
warnOnMaxMessagesChange(configs, replicas)
AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
@ -324,6 +326,7 @@ object TopicCommand extends Logging {
val ifNotExistsOpt = parser.accepts("if-not-exists",
"if set when creating topics, the action will only execute if the topic does not already exist")
val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
val options = parser.parse(args : _*)
val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)

View File

@ -19,6 +19,7 @@ package kafka.cluster
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import kafka.common.{BrokerEndPointNotAvailableException, BrokerNotAvailableException, KafkaException}
import kafka.utils.Json
import org.apache.kafka.common.Node
@ -33,24 +34,39 @@ object Broker {
/**
* Create a broker object from id and JSON string.
*
* @param id
* @param brokerInfoString
*
* Version 1 JSON schema for a broker is:
* {"version":1,
* {
* "version":1,
* "host":"localhost",
* "port":9092
* "jmx_port":9999,
* "timestamp":"2233345666" }
* "timestamp":"2233345666"
* }
*
* The current JSON schema for a broker is:
* {"version":2,
* "host","localhost",
* "port",9092
* Version 2 JSON schema for a broker is:
* {
* "version":2,
* "host":"localhost",
* "port":9092
* "jmx_port":9999,
* "timestamp":"2233345666",
* "endpoints": ["PLAINTEXT://host1:9092",
* "SSL://host1:9093"]
* "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
* }
*
* Version 3 (current) JSON schema for a broker is:
* {
* "version":3,
* "host":"localhost",
* "port":9092
* "jmx_port":9999,
* "timestamp":"2233345666",
* "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
* "rack":"dc1"
* }
*/
def createBroker(id: Int, brokerInfoString: String): Broker = {
if (brokerInfoString == null)
@ -75,9 +91,8 @@ object Broker {
(ep.protocolType, ep)
}.toMap
}
new Broker(id, endpoints)
val rack = brokerInfo.get("rack").filter(_ != null).map(_.asInstanceOf[String])
new Broker(id, endpoints, rack)
case None =>
throw new BrokerNotAvailableException(s"Broker id $id does not exist")
}
@ -86,61 +101,34 @@ object Broker {
throw new KafkaException(s"Failed to parse the broker info from zookeeper: $brokerInfoString", t)
}
}
/**
*
* @param buffer Containing serialized broker.
* Current serialization is:
* id (int), number of endpoints (int), serialized endpoints
* @return broker object
*/
def readFrom(buffer: ByteBuffer): Broker = {
val id = buffer.getInt
val numEndpoints = buffer.getInt
val endpoints = List.range(0, numEndpoints).map(i => EndPoint.readFrom(buffer))
.map(ep => ep.protocolType -> ep).toMap
new Broker(id, endpoints)
}
}
case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint]) {
case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, EndPoint], rack: Option[String]) {
override def toString: String = id + " : " + endPoints.values.mkString("(",",",")")
override def toString: String =
s"$id : ${endPoints.values.mkString("(",",",")")} : ${rack.orNull}"
def this(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) = {
this(id, endPoints, None)
}
def this(id: Int, host: String, port: Int, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = {
this(id, Map(protocol -> EndPoint(host, port, protocol)))
this(id, Map(protocol -> EndPoint(host, port, protocol)), None)
}
def this(bep: BrokerEndPoint, protocol: SecurityProtocol) = {
this(bep.id, bep.host, bep.port, protocol)
}
def writeTo(buffer: ByteBuffer) {
buffer.putInt(id)
buffer.putInt(endPoints.size)
for(endpoint <- endPoints.values) {
endpoint.writeTo(buffer)
}
}
def sizeInBytes: Int =
4 + /* broker id*/
4 + /* number of endPoints */
endPoints.values.map(_.sizeInBytes).sum /* end points */
def supportsChannel(protocolType: SecurityProtocol): Unit = {
endPoints.contains(protocolType)
}
def getNode(protocolType: SecurityProtocol): Node = {
val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)))
val endpoint = endPoints.getOrElse(protocolType,
throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id"))
new Node(id, endpoint.host, endpoint.port)
}
def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = {
val endpoint = endPoints.getOrElse(protocolType, throw new BrokerEndPointNotAvailableException("End point %s not found for broker %d".format(protocolType,id)))
val endpoint = endPoints.getOrElse(protocolType,
throw new BrokerEndPointNotAvailableException(s"End point with security protocol $protocolType not found for broker $id"))
new BrokerEndPoint(id, endpoint.host, endpoint.port)
}

View File

@ -16,24 +16,25 @@
*/
package kafka.controller
import kafka.api.{LeaderAndIsr, KAFKA_0_9_0, PartitionStateInfo}
import kafka.utils._
import org.apache.kafka.clients.{ClientResponse, ClientRequest, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, Node}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{LoginType, Selectable, ChannelBuilders, Selector, NetworkReceive, Mode}
import org.apache.kafka.common.protocol.{SecurityProtocol, ApiKeys}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import collection.mutable.HashMap
import java.net.SocketTimeoutException
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
import kafka.api._
import kafka.cluster.Broker
import java.net.{SocketTimeoutException}
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
import kafka.server.KafkaConfig
import collection.mutable
import kafka.common.{KafkaException, TopicAndPartition}
import collection.Set
import collection.JavaConverters._
import kafka.server.KafkaConfig
import kafka.utils._
import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector}
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{BrokerEndPoint, Node, TopicPartition}
import scala.collection.JavaConverters._
import scala.collection.{Set, mutable}
import scala.collection.mutable.HashMap
class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@ -380,7 +381,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
topicPartition -> partitionState
}
val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) (1: Short) else (0: Short)
val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2: Short
else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
else 0: Short
val updateMetadataRequest =
if (version == 0) {
@ -395,9 +398,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
}
new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava)
new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
}
new UpdateMetadataRequest(controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
}
controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)

View File

@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import java.lang.{Long => JLong, Short => JShort}
import java.util.Properties
import kafka.admin.AdminUtils
import kafka.admin.{RackAwareMode, AdminUtils}
import kafka.api._
import kafka.cluster.Partition
import kafka.common._
@ -624,7 +624,7 @@ class KafkaApis(val requestChannel: RequestChannel,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
try {
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties)
AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, java.util.Collections.emptyList())

View File

@ -222,6 +222,8 @@ object KafkaConfig {
val MaxConnectionsPerIpProp = "max.connections.per.ip"
val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
/***************** rack configuration *************/
val RackProp = "broker.rack"
/** ********* Log Configuration ***********/
val NumPartitionsProp = "num.partitions"
val LogDirsProp = "log.dirs"
@ -388,6 +390,8 @@ object KafkaConfig {
val MaxConnectionsPerIpDoc = "The maximum number of connections we allow from each ip address"
val MaxConnectionsPerIpOverridesDoc = "Per-ip or hostname overrides to the default maximum number of connections"
val ConnectionsMaxIdleMsDoc = "Idle connections timeout: the server socket processor threads close the connections that idle more than this"
/************* Rack Configuration **************/
val RackDoc = "Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. Examples: `RACK1`, `us-east-1d`"
/** ********* Log Configuration ***********/
val NumPartitionsDoc = "The default number of log partitions per topic"
val LogDirDoc = "The directory in which the log data is kept (supplemental for " + LogDirsProp + " property)"
@ -571,6 +575,9 @@ object KafkaConfig {
.define(MaxConnectionsPerIpOverridesProp, STRING, Defaults.MaxConnectionsPerIpOverrides, MEDIUM, MaxConnectionsPerIpOverridesDoc)
.define(ConnectionsMaxIdleMsProp, LONG, Defaults.ConnectionsMaxIdleMs, MEDIUM, ConnectionsMaxIdleMsDoc)
/************ Rack Configuration ******************/
.define(RackProp, STRING, null, MEDIUM, RackDoc)
/** ********* Log Configuration ***********/
.define(NumPartitionsProp, INT, Defaults.NumPartitions, atLeast(1), MEDIUM, NumPartitionsDoc)
.define(LogDirProp, STRING, Defaults.LogDir, HIGH, LogDirDoc)
@ -771,6 +778,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, getString(KafkaConfig.MaxConnectionsPerIpOverridesProp)).map { case (k, v) => (k, v.toInt)}
val connectionsMaxIdleMs = getLong(KafkaConfig.ConnectionsMaxIdleMsProp)
/***************** rack configuration **************/
val rack = Option(getString(KafkaConfig.RackProp))
/** ********* Log Configuration ***********/
val autoCreateTopicsEnable = getBoolean(KafkaConfig.AutoCreateTopicsEnableProp)
val numPartitions = getInt(KafkaConfig.NumPartitionsProp)

View File

@ -17,12 +17,14 @@
package kafka.server
import java.net.InetAddress
import kafka.api.ApiVersion
import kafka.cluster.EndPoint
import kafka.utils._
import org.I0Itec.zkclient.IZkStateListener
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkStateListener, ZkClient, ZkConnection}
import java.net.InetAddress
/**
@ -35,7 +37,9 @@ import java.net.InetAddress
*/
class KafkaHealthcheck(private val brokerId: Int,
private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
private val zkUtils: ZkUtils) extends Logging {
private val zkUtils: ZkUtils,
private val rack: Option[String],
private val interBrokerProtocolVersion: ApiVersion) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
@ -61,7 +65,8 @@ class KafkaHealthcheck(private val brokerId: Int,
// only PLAINTEXT is supported as default
// if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack,
interBrokerProtocolVersion)
}
/**

View File

@ -239,7 +239,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
else
(protocol, endpoint)
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()
// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it

View File

@ -159,7 +159,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging {
endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol))
nodes.put(protocol, new Node(broker.id, ep.host, ep.port))
}
aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala)
aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala, Option(broker.rack))
aliveNodes(broker.id) = nodes.asScala
}

View File

@ -18,31 +18,26 @@
package kafka.utils
import java.util.concurrent.CountDownLatch
import kafka.admin._
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0, LeaderAndIsr}
import kafka.cluster._
import kafka.common.{KafkaException, NoEpochForPartitionException, TopicAndPartition}
import kafka.consumer.{ConsumerThreadId, TopicCount}
import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
import kafka.server.ConfigType
import org.I0Itec.zkclient.{ZkClient,ZkConnection}
import org.I0Itec.zkclient.exception.{ZkException, ZkNodeExistsException, ZkNoNodeException,
ZkMarshallingError, ZkBadVersionException}
import kafka.utils.ZkUtils._
import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMarshallingError, ZkNoNodeException, ZkNodeExistsException}
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.ZooDefs
import scala.collection._
import kafka.api.LeaderAndIsr
import org.apache.zookeeper.data.{ACL, Stat}
import kafka.admin._
import kafka.common.{KafkaException, NoEpochForPartitionException}
import kafka.controller.ReassignedPartitionsContext
import kafka.controller.KafkaController
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.common.TopicAndPartition
import kafka.utils.ZkUtils._
import org.apache.zookeeper.AsyncCallback.{DataCallback,StringCallback}
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.ZooKeeper
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException, ZooDefs, ZooKeeper}
import scala.collection._
object ZkUtils {
val ConsumersPath = "/consumers"
@ -256,19 +251,43 @@ class ZkUtils(val zkClient: ZkClient,
}
/**
* Register brokers with v2 json format (which includes multiple endpoints).
* Register brokers with v3 json format (which includes multiple endpoints and rack) if
* the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise.
* Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2.
* We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X without having to upgrade
* to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case).
*
* This format also includes default endpoints for compatibility with older clients.
* @param id
* @param host
* @param port
* @param advertisedEndpoints
* @param jmxPort
*
* @param id broker ID
* @param host broker host name
* @param port broker port
* @param advertisedEndpoints broker end points
* @param jmxPort jmx port
* @param rack broker rack
* @param apiVersion Kafka version the broker is running as
*/
def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int) {
def registerBrokerInZk(id: Int,
host: String,
port: Int,
advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint],
jmxPort: Int,
rack: Option[String],
apiVersion: ApiVersion) {
val brokerIdPath = BrokerIdsPath + "/" + id
val timestamp = SystemTime.milliseconds.toString
val brokerInfo = Json.encode(Map("version" -> 2, "host" -> host, "port" -> port, "endpoints"->advertisedEndpoints.values.map(_.connectionString).toArray, "jmx_port" -> jmxPort, "timestamp" -> timestamp))
val version = if (apiVersion >= KAFKA_0_10_0_IV0) 3 else 2
var jsonMap = Map("version" -> version,
"host" -> host,
"port" -> port,
"endpoints" -> advertisedEndpoints.values.map(_.connectionString).toArray,
"jmx_port" -> jmxPort,
"timestamp" -> timestamp
)
rack.foreach(rack => if (version >= 3) jsonMap += ("rack" -> rack))
val brokerInfo = Json.encode(jsonMap)
registerBrokerInZk(brokerIdPath, brokerInfo)
info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(",")))
@ -745,6 +764,7 @@ class ZkUtils(val zkClient: ZkClient,
/**
* This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker
* or throws an exception if the broker dies before the query to zookeeper finishes
*
* @param brokerId The broker id
* @return An optional Broker object encapsulating the broker metadata
*/
@ -768,7 +788,6 @@ class ZkUtils(val zkClient: ZkClient,
case e: ZkNoNodeException => {
createParentPath(BrokerSequenceIdPath, acls)
try {
import scala.collection.JavaConversions._
zkClient.createPersistent(BrokerSequenceIdPath, "", acls)
0
} catch {
@ -880,7 +899,6 @@ class ZKConfig(props: VerifiableProperties) {
object ZkPath {
@volatile private var isNamespacePresent: Boolean = false
import scala.collection.JavaConversions._
def checkNamespace(client: ZkClient) {
if(isNamespacePresent)

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.util.Properties
import kafka.admin.{RackAwareMode, AdminUtils, RackAwareTest}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.Assert._
import org.junit.Test
import scala.collection.Map
class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest {
val numServers = 4
val numPartitions = 8
val replicationFactor = 2
val overridingProps = new Properties()
overridingProps.put(KafkaConfig.NumPartitionsProp, numPartitions.toString)
overridingProps.put(KafkaConfig.DefaultReplicationFactorProp, replicationFactor.toString)
def generateConfigs() =
(0 until numServers) map { node =>
TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString))
} map (KafkaConfig.fromProps(_, overridingProps))
private val topic = "topic"
@Test
def testAutoCreateTopic() {
val producer = TestUtils.createNewProducer(brokerList, retries = 5)
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
assertEquals("Should have offset 0", 0L, producer.send(record).get.offset)
// double check that the topic is created with leader elected
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
val assignment = zkUtils.getReplicaAssignmentForTopics(Seq(topic)).map { case (topicPartition, replicas) =>
topicPartition.partition -> replicas
}
val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor)
} finally producer.close()
}
}

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.utils.{Logging, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import org.junit.Assert._
import org.junit.Test
import scala.collection.{Map, Seq}
class AdminRackAwareTest extends RackAwareTest with Logging {
@Test
def testGetRackAlternatedBrokerListAndAssignReplicasToBrokers() {
val rackMap = Map(0 -> "rack1", 1 -> "rack3", 2 -> "rack3", 3 -> "rack2", 4 -> "rack2", 5 -> "rack1")
val newList = AdminUtils.getRackAlternatedBrokerList(rackMap)
assertEquals(List(0, 3, 1, 5, 4, 2), newList)
val anotherList = AdminUtils.getRackAlternatedBrokerList(rackMap - 5)
assertEquals(List(0, 3, 1, 4, 2), anotherList)
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(rackMap), 7, 3, 0, 0)
val expected = Map(0 -> List(0, 3, 1),
1 -> List(3, 1, 5),
2 -> List(1, 5, 4),
3 -> List(5, 4, 2),
4 -> List(4, 2, 0),
5 -> List(2, 0, 3),
6 -> List(0, 4, 2))
assertEquals(expected, assignment)
}
@Test
def testAssignmentWithRackAware() {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 6
val replicationFactor = 3
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor, 2, 0)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
replicationFactor)
}
@Test
def testAssignmentWithRackAwareWithRandomStartIndex() {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 6
val replicationFactor = 3
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
replicationFactor)
}
@Test
def testAssignmentWithRackAwareWithUnevenReplicas() {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 13
val replicationFactor = 3
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor, 0, 0)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
replicationFactor, verifyLeaderDistribution = false, verifyReplicasDistribution = false)
}
@Test
def testAssignmentWithRackAwareWithUnevenRacks() {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 12
val replicationFactor = 3
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
replicationFactor, verifyReplicasDistribution = false)
}
@Test
def testAssignmentWith2ReplicasRackAware() {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 12
val replicationFactor = 2
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
replicationFactor)
}
@Test
def testRackAwareExpansion() {
val brokerRackMapping = Map(6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack3", 11 -> "rack1")
val numPartitions = 12
val replicationFactor = 2
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor, startPartitionId = 12)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
replicationFactor)
}
@Test
def testAssignmentWith2ReplicasRackAwareWith6Partitions() {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1")
val numPartitions = 6
val replicationFactor = 2
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
replicationFactor)
}
@Test
def testAssignmentWith2ReplicasRackAwareWith6PartitionsAnd3Brokers() {
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 4 -> "rack3")
val numPartitions = 3
val replicationFactor = 2
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions, replicationFactor)
}
@Test
def testLargeNumberPartitionsAssignment() {
val numPartitions = 96
val replicationFactor = 3
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack1",
6 -> "rack1", 7 -> "rack2", 8 -> "rack2", 9 -> "rack3", 10 -> "rack1", 11 -> "rack3")
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor)
checkReplicaDistribution(assignment, brokerRackMapping, brokerRackMapping.size, numPartitions,
replicationFactor)
}
@Test
def testMoreReplicasThanRacks() {
val numPartitions = 6
val replicationFactor = 5
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 until numPartitions)
assertEquals(3, distribution.partitionRacks(partition).toSet.size)
}
@Test
def testLessReplicasThanRacks() {
val numPartitions = 6
val replicationFactor = 2
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack3", 4 -> "rack3", 5 -> "rack2")
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions,
replicationFactor)
assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 to 5)
assertEquals(2, distribution.partitionRacks(partition).toSet.size)
}
@Test
def testSingleRack() {
val numPartitions = 6
val replicationFactor = 3
val brokerRackMapping = Map(0 -> "rack1", 1 -> "rack1", 2 -> "rack1", 3 -> "rack1", 4 -> "rack1", 5 -> "rack1")
val assignment = AdminUtils.assignReplicasToBrokers(toBrokerMetadata(brokerRackMapping), numPartitions, replicationFactor)
assertEquals(List.fill(assignment.size)(replicationFactor), assignment.values.map(_.size))
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
for (partition <- 0 until numPartitions)
assertEquals(1, distribution.partitionRacks(partition).toSet.size)
for (broker <- brokerRackMapping.keys)
assertEquals(1, distribution.brokerLeaderCount(broker))
}
@Test
def testSkipBrokerWithReplicaAlreadyAssigned() {
val rackInfo = Map(0 -> "a", 1 -> "b", 2 -> "c", 3 -> "a", 4 -> "a")
val brokerList = 0 to 4
val numPartitions = 6
val replicationFactor = 4
val brokerMetadatas = toBrokerMetadata(rackInfo)
assertEquals(brokerList, brokerMetadatas.map(_.id))
val assignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, numPartitions, replicationFactor,
fixedStartIndex = 2)
checkReplicaDistribution(assignment, rackInfo, 5, 6, 4,
verifyRackAware = false, verifyLeaderDistribution = false, verifyReplicasDistribution = false)
}
}

View File

@ -33,20 +33,20 @@ import TestUtils._
import scala.collection.{Map, immutable}
class AdminTest extends ZooKeeperTestHarness with Logging {
class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@Test
def testReplicaAssignment() {
val brokerList = List(0, 1, 2, 3, 4)
val brokerMetadatas = (0 to 4).map(new BrokerMetadata(_, None))
// test 0 replication factor
intercept[AdminOperationException] {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 0)
AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 0)
}
// test wrong replication factor
intercept[AdminOperationException] {
AdminUtils.assignReplicasToBrokers(brokerList, 10, 6)
AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 6)
}
// correct assignment
@ -62,9 +62,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
8 -> List(3, 0, 1),
9 -> List(4, 1, 2))
val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
val e = (expectedAssignment.toList == actualAssignment.toList)
assertTrue(expectedAssignment.toList == actualAssignment.toList)
val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, 10, 3, 0)
assertEquals(expectedAssignment, actualAssignment)
}
@Test
@ -314,7 +313,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
val partition = 1
val preferredReplica = 0
// create brokers
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false).map(KafkaConfig.fromProps)
val brokerRack = Map(0 -> "rack0", 1 -> "rack1", 2 -> "rack2")
val serverConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false, rackInfo = brokerRack).map(KafkaConfig.fromProps)
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
val servers = serverConfigs.reverseMap(s => TestUtils.createServer(s))
@ -452,4 +452,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging {
server.config.logDirs.foreach(CoreUtils.rm(_))
}
}
@Test
def testGetBrokerMetadatas() {
// broker 4 has no rack information
val brokerList = 0 to 5
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3")
val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet))
TestUtils.createBrokersInZk(brokerMetadatas, zkUtils)
val processedMetadatas1 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Disabled)
assertEquals(brokerList, processedMetadatas1.map(_.id))
assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack))
val processedMetadatas2 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Safe)
assertEquals(brokerList, processedMetadatas2.map(_.id))
assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack))
intercept[AdminOperationException] {
AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
}
val partialList = List(0, 1, 2, 3, 5)
val processedMetadatas3 = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced, Some(partialList))
assertEquals(partialList, processedMetadatas3.map(_.id))
assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack))
val numPartitions = 3
AdminUtils.createTopic(zkUtils, "foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe)
val assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo"))
assertEquals(numPartitions, assignment.size)
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import scala.collection.{Map, Seq, mutable}
import org.junit.Assert._
trait RackAwareTest {
def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
brokerRackMapping: Map[Int, String],
numBrokers: Int,
numPartitions: Int,
replicationFactor: Int,
verifyRackAware: Boolean = true,
verifyLeaderDistribution: Boolean = true,
verifyReplicasDistribution: Boolean = true) {
// always verify that no broker will be assigned for more than one replica
for ((_, brokerList) <- assignment) {
assertEquals("More than one replica is assigned to same broker for the same partition", brokerList.toSet.size, brokerList.size)
}
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
if (verifyRackAware) {
val partitionRackMap = distribution.partitionRacks
assertEquals("More than one replica of the same partition is assigned to the same rack",
List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size))
}
if (verifyLeaderDistribution) {
val leaderCount = distribution.brokerLeaderCount
val leaderCountPerBroker = numPartitions / numBrokers
assertEquals("Preferred leader count is not even for brokers", List.fill(numBrokers)(leaderCountPerBroker), leaderCount.values.toList)
}
if (verifyReplicasDistribution) {
val replicasCount = distribution.brokerReplicasCount
val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers
assertEquals("Replica count is not even for broker", List.fill(numBrokers)(numReplicasPerBroker), replicasCount.values.toList)
}
}
def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int, String]): ReplicaDistributions = {
val leaderCount = mutable.Map[Int, Int]()
val partitionCount = mutable.Map[Int, Int]()
val partitionRackMap = mutable.Map[Int, List[String]]()
assignment.foreach { case (partitionId, replicaList) =>
val leader = replicaList.head
leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1
for (brokerId <- replicaList) {
partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1
val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for $brokerId in `brokerRackMapping`"))
partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List())
}
}
ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
}
def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): Seq[BrokerMetadata] =
rackMap.toSeq.map { case (brokerId, rack) =>
BrokerMetadata(brokerId, Some(rack))
} ++ brokersWithoutRack.map { brokerId =>
BrokerMetadata(brokerId, None)
}.sortBy(_.id)
}
case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import kafka.utils.{Logging, TestUtils}
import kafka.zk.ZooKeeperTestHarness
import org.junit.Test
class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@Test
def testRackAwareReassign() {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
val numPartitions = 18
val replicationFactor = 3
// create a non rack aware assignment topic first
val createOpts = new kafka.admin.TopicCommand.TopicCommandOptions(Array(
"--partitions", numPartitions.toString,
"--replication-factor", replicationFactor.toString,
"--disable-rack-aware",
"--topic", "foo"))
kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
rackInfo.keys.toSeq.sorted, topicJson, disableRackAware = false)
val assignment = proposedAssignment map { case (topicPartition, replicas) =>
(topicPartition.partition, replicas)
}
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
}
}

View File

@ -27,7 +27,7 @@ import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils._
import kafka.coordinator.GroupCoordinator
class TopicCommandTest extends ZooKeeperTestHarness with Logging {
class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
@Test
def testConfigPreservationAcrossPartitionAlteration() {
@ -157,4 +157,34 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging {
Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic, "--if-not-exists"))
TopicCommand.createTopic(zkUtils, createNotExistsOpts)
}
@Test
def testCreateAlterTopicWithRackAware() {
val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3")
TestUtils.createBrokersInZk(toBrokerMetadata(rackInfo), zkUtils)
val numPartitions = 18
val replicationFactor = 3
val createOpts = new TopicCommandOptions(Array(
"--partitions", numPartitions.toString,
"--replication-factor", replicationFactor.toString,
"--topic", "foo"))
TopicCommand.createTopic(zkUtils, createOpts)
var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
tp.partition -> replicas
}
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, numPartitions, replicationFactor)
val alteredNumPartitions = 36
// verify that adding partitions will also be rack aware
val alterOpts = new TopicCommandOptions(Array(
"--partitions", alteredNumPartitions.toString,
"--topic", "foo"))
TopicCommand.alterTopic(zkUtils, alterOpts)
assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
tp.partition -> replicas
}
checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
}
}

View File

@ -27,20 +27,6 @@ import scala.collection.mutable
class BrokerEndPointTest extends Logging {
@Test
def testSerDe() {
val endpoint = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
val listEndPoints = Map(SecurityProtocol.PLAINTEXT -> endpoint)
val origBroker = new Broker(1, listEndPoints)
val brokerBytes = ByteBuffer.allocate(origBroker.sizeInBytes)
origBroker.writeTo(brokerBytes)
val newBroker = Broker.readFrom(brokerBytes.flip().asInstanceOf[ByteBuffer])
assert(origBroker == newBroker)
}
@Test
def testHashAndEquals() {
val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)

View File

@ -530,7 +530,7 @@ class KafkaConfigTest {
case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.MetricReporterClassesProp => // ignore string
case KafkaConfig.RackProp => // ignore string
//SSL Configs
case KafkaConfig.PrincipalBuilderClassProp =>
case KafkaConfig.SslProtocolProp => // ignore string

View File

@ -27,29 +27,27 @@ import java.security.cert.X509Certificate
import javax.net.ssl.X509TrustManager
import charset.Charset
import kafka.security.auth.{Resource, Authorizer, Acl}
import kafka.security.auth.{Acl, Authorizer, Resource}
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.utils.Utils._
import org.apache.kafka.test.TestSslUtils
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import kafka.server._
import kafka.producer._
import kafka.message._
import kafka.api._
import kafka.cluster.Broker
import kafka.consumer.{ConsumerTimeoutException, KafkaStream, ConsumerConfig}
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.cluster.{Broker, EndPoint}
import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
import kafka.common.TopicAndPartition
import kafka.admin.AdminUtils
import kafka.producer.ProducerConfig
import kafka.log._
import kafka.utils.ZkUtils._
import org.junit.Assert._
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.{RangeAssignor, KafkaConsumer}
import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.network.Mode
@ -154,11 +152,12 @@ object TestUtils extends Logging {
enablePlaintext: Boolean = true,
enableSsl: Boolean = false,
enableSaslPlaintext: Boolean = false,
enableSaslSsl: Boolean = false): Seq[Properties] = {
enableSaslSsl: Boolean = false,
rackInfo: Map[Int, String] = Map()): Seq[Properties] = {
(0 until numConfigs).map { node =>
createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort,
interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = enablePlaintext, enableSsl = enableSsl,
enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl)
enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl, rack = rackInfo.get(node))
}
}
@ -180,7 +179,7 @@ object TestUtils extends Logging {
enablePlaintext: Boolean = true,
enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort,
enableSsl: Boolean = false, sslPort: Int = RandomPort,
enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort)
enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort, rack: Option[String] = None)
: Properties = {
def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol)
@ -210,6 +209,7 @@ object TestUtils extends Logging {
props.put("delete.topic.enable", enableDeleteTopic.toString)
props.put("controlled.shutdown.retry.backoff.ms", "100")
props.put("log.cleaner.dedupe.buffer.size", "2097152")
rack.foreach(props.put("broker.rack", _))
if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })
props.putAll(sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId"))
@ -591,9 +591,16 @@ object TestUtils extends Logging {
}
}
def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = {
val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT))
brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1))
def createBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] =
createBrokersInZk(ids.map(kafka.admin.BrokerMetadata(_, None)), zkUtils)
def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata], zkUtils: ZkUtils): Seq[Broker] = {
val brokers = brokerMetadatas.map { b =>
val protocol = SecurityProtocol.PLAINTEXT
Broker(b.id, Map(protocol -> EndPoint("localhost", 6667, protocol)).toMap, b.rack)
}
brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1,
rack = b.rack, ApiVersion.latestVersion))
brokers
}

View File

@ -21,6 +21,11 @@
0.10.0.0 has <a href="#upgrade_10_breaking">potential breaking changes</a> (please review before upgrading) and
there may be a <a href="#upgrade_10_performance_impact">performance impact during the upgrade</a>. Because new protocols
are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.
<p/>
<b>Notes to clients with version 0.9.0.0: </b>Due to a bug introduced in 0.9.0.0,
clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not
work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 <b>before</b> brokers are upgraded to
0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients.
<p><b>For a rolling upgrade:</b></p>