KAFKA-1740: merge offset manager into consumer coordinator; reviewed by Onur Karaman and Jason Gustafson

This commit is contained in:
Guozhang Wang 2015-07-02 11:41:51 -07:00
parent 14e0ce0a47
commit 3f8480ccfb
23 changed files with 357 additions and 207 deletions

View File

@ -290,9 +290,10 @@ public final class Coordinator {
// re-discover the coordinator and retry
coordinatorDead();
future.retryWithNewCoordinator();
} else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
// just ignore this partition
log.debug("Unknown topic or partition for " + tp);
} else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
|| data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
// need to re-join group
subscriptions.needReassignment();
} else {
future.raise(new KafkaException("Unexpected error in fetch offset response: "
+ Errors.forCode(data.errorCode).exception().getMessage()));
@ -499,13 +500,23 @@ public final class Coordinator {
|| errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
coordinatorDead();
future.retryWithNewCoordinator();
} else {
} else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
|| errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
// do not need to throw the exception but just log the error
future.retryAfterBackoff();
log.error("Error committing partition {} at offset {}: {}",
tp,
offset,
Errors.forCode(errorCode).exception().getMessage());
tp,
offset,
Errors.forCode(errorCode).exception().getMessage());
} else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
|| errorCode == Errors.ILLEGAL_GENERATION.code()) {
// need to re-join group
subscriptions.needReassignment();
} else {
// re-throw the exception as these should not happen
log.error("Error committing partition {} at offset {}: {}",
tp,
offset,
Errors.forCode(errorCode).exception().getMessage());
}
}

View File

@ -77,7 +77,11 @@ public enum Errors {
UNKNOWN_CONSUMER_ID(25,
new ApiException("The coordinator is not aware of this consumer.")),
INVALID_SESSION_TIMEOUT(26,
new ApiException("The session timeout is not within an acceptable range."));
new ApiException("The session timeout is not within an acceptable range.")),
COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
new ApiException("Some of the committing partitions are not assigned the committer")),
INVALID_COMMIT_OFFSET_SIZE(28,
new ApiException("The committing offset data size is not valid"));
private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

View File

@ -41,7 +41,13 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
/**
* Possible error code:
*
* TODO
* OFFSET_METADATA_TOO_LARGE (12)
* CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_CONSUMER (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_CONSUMER_ID (25)
* COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
* INVALID_COMMIT_OFFSET_SIZE (28)
*/
private final Map<TopicPartition, Short> responseData;

View File

@ -42,9 +42,6 @@ public class OffsetFetchRequest extends AbstractRequest {
// partition level field names
private static final String PARTITION_KEY_NAME = "partition";
public static final int DEFAULT_GENERATION_ID = -1;
public static final String DEFAULT_CONSUMER_ID = "";
private final String groupId;
private final List<TopicPartition> partitions;

View File

@ -47,10 +47,11 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
/**
* Possible error code:
*
* UNKNOWN_TOPIC_OR_PARTITION (3)
* UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
* OFFSET_LOAD_IN_PROGRESS (14)
* NOT_COORDINATOR_FOR_CONSUMER (16)
* NO_OFFSETS_FETCHABLE (23)
* ILLEGAL_GENERATION (22)
* UNKNOWN_CONSUMER_ID (25)
*/
private final Map<TopicPartition, PartitionData> responseData;

View File

@ -333,13 +333,6 @@ public class CoordinatorTest {
assertTrue(result.isDone());
assertTrue(result.value().isEmpty());
// fetch with offset topic unknown
client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
client.poll(0, time.milliseconds());
assertTrue(result.isDone());
assertTrue(result.value().isEmpty());
// fetch with offset -1
client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());

View File

@ -27,8 +27,8 @@ import scala.collection._
import scala.collection.JavaConversions._
import kafka.log.LogConfig
import kafka.consumer.Whitelist
import kafka.server.OffsetManager
import org.apache.kafka.common.utils.Utils
import kafka.coordinator.ConsumerCoordinator
object TopicCommand {
@ -111,7 +111,7 @@ object TopicCommand {
println("Updated config for topic \"%s\".".format(topic))
}
if(opts.options.has(opts.partitionsOpt)) {
if (topic == OffsetManager.OffsetsTopicName) {
if (topic == ConsumerCoordinator.OffsetsTopicName) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
}
println("WARNING: If partitions are increased for a topic that has a key, the partition " +

View File

@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
import kafka.admin.AdminUtils
import kafka.api.{PartitionStateInfo, LeaderAndIsr}
import kafka.log.LogConfig
import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, LogReadResult, ReplicaManager}
import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, LogReadResult, ReplicaManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.message.ByteBufferMessageSet
@ -160,8 +160,7 @@ class Partition(val topic: String,
* and setting the new leader and ISR
*/
def makeLeader(controllerId: Int,
partitionStateInfo: PartitionStateInfo, correlationId: Int,
offsetManager: OffsetManager): Boolean = {
partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@ -186,8 +185,6 @@ class Partition(val topic: String,
if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult))
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(newLeaderReplica)
if (topic == OffsetManager.OffsetsTopicName)
offsetManager.loadOffsetsFromLog(partitionId)
true
}
}
@ -198,7 +195,7 @@ class Partition(val topic: String,
*/
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
correlationId: Int, offsetManager: OffsetManager): Boolean = {
correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@ -215,13 +212,6 @@ class Partition(val topic: String,
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion
leaderReplicaIdOpt.foreach { leaderReplica =>
if (topic == OffsetManager.OffsetsTopicName &&
/* if we are making a leader->follower transition */
leaderReplica == localBrokerId)
offsetManager.removeOffsetsFromCacheForPartition(partitionId)
}
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
false
}

View File

@ -17,6 +17,8 @@
package kafka.common
import org.apache.kafka.common.protocol.Errors
case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) {
override def toString = "OffsetMetadata[%d,%s]"
.format(offset,
@ -51,7 +53,7 @@ object OffsetAndMetadata {
def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata))
}
case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = ErrorMapping.NoError) {
case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) {
def offset = offsetMetadata.offset
def metadata = offsetMetadata.metadata
@ -60,10 +62,12 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short =
}
object OffsetMetadataAndError {
val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NoError)
val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.OffsetsLoadInProgressCode)
val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code)
val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code)
val UnknownConsumer = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_CONSUMER_ID.code)
val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code)
def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), ErrorMapping.NoError)

View File

@ -18,7 +18,7 @@
package kafka.common
import util.matching.Regex
import kafka.server.OffsetManager
import kafka.coordinator.ConsumerCoordinator
object Topic {
@ -26,7 +26,7 @@ object Topic {
private val maxNameLength = 255
private val rgx = new Regex(legalChars + "+")
val InternalTopics = Set(OffsetManager.OffsetsTopicName)
val InternalTopics = Set(ConsumerCoordinator.OffsetsTopicName)
def validate(topic: String) {
if (topic.length <= 0)

View File

@ -16,7 +16,9 @@
*/
package kafka.coordinator
import kafka.common.TopicAndPartition
import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition}
import kafka.message.UncompressedCodec
import kafka.log.LogConfig
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.protocol.Errors
@ -24,7 +26,11 @@ import org.apache.kafka.common.requests.JoinGroupRequest
import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.atomic.AtomicBoolean
import java.util.Properties
import scala.collection.{Map, Seq, immutable}
case class GroupManagerConfig(consumerMinSessionTimeoutMs: Int,
consumerMaxSessionTimeoutMs: Int)
/**
* ConsumerCoordinator handles consumer group and consumer offset management.
@ -33,11 +39,13 @@ import java.util.concurrent.atomic.AtomicBoolean
* consumer groups. Consumer groups are assigned to coordinators based on their
* group names.
*/
class ConsumerCoordinator(val config: KafkaConfig,
val zkClient: ZkClient,
val offsetManager: OffsetManager) extends Logging {
class ConsumerCoordinator(val brokerId: Int,
val groupConfig: GroupManagerConfig,
val offsetConfig: OffsetManagerConfig,
private val offsetManager: OffsetManager,
zkClient: ZkClient) extends Logging {
this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: "
this.logIdent = "[ConsumerCoordinator " + brokerId + "]: "
private val isActive = new AtomicBoolean(false)
@ -45,6 +53,22 @@ class ConsumerCoordinator(val config: KafkaConfig,
private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null
private var coordinatorMetadata: CoordinatorMetadata = null
def this(brokerId: Int,
groupConfig: GroupManagerConfig,
offsetConfig: OffsetManagerConfig,
replicaManager: ReplicaManager,
zkClient: ZkClient,
scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
new OffsetManager(offsetConfig, replicaManager, zkClient, scheduler), zkClient)
def offsetsTopicConfigs: Properties = {
val props = new Properties
props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
props
}
/**
* NOTE: If a group lock and metadataLock are simultaneously needed,
* be sure to acquire the group lock before metadataLock to prevent deadlock
@ -55,9 +79,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
*/
def startup() {
info("Starting up.")
heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
coordinatorMetadata = new CoordinatorMetadata(config, zkClient, maybePrepareRebalance)
heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId)
coordinatorMetadata = new CoordinatorMetadata(brokerId, zkClient, maybePrepareRebalance)
isActive.set(true)
info("Startup complete.")
}
@ -69,6 +93,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
def shutdown() {
info("Shutting down.")
isActive.set(false)
offsetManager.shutdown()
coordinatorMetadata.shutdown()
heartbeatPurgatory.shutdown()
rebalancePurgatory.shutdown()
@ -87,7 +112,8 @@ class ConsumerCoordinator(val config: KafkaConfig,
responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
} else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) {
responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code)
} else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs > config.consumerMaxSessionTimeoutMs) {
} else if (sessionTimeoutMs < groupConfig.consumerMinSessionTimeoutMs ||
sessionTimeoutMs > groupConfig.consumerMaxSessionTimeoutMs) {
responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code)
} else {
// only try to create the group if the group is not unknown AND
@ -196,6 +222,75 @@ class ConsumerCoordinator(val config: KafkaConfig,
}
}
def handleCommitOffsets(groupId: String,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
if (!isActive.get) {
responseCallback(offsetMetadata.mapValues(_ => Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code))
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_CONSUMER.code))
} else {
val group = coordinatorMetadata.getGroup(groupId)
if (group == null) {
// if the group does not exist, it means this group is not relying
// on Kafka for partition management, and hence never send join-group
// request to the coordinator before; in this case blindly commit the offsets
offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
} else {
group synchronized {
if (group.is(Dead)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code))
} else if (!group.has(consumerId)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code))
} else if (generationId != group.generationId) {
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
} else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code))
} else {
offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
}
}
}
}
}
def handleFetchOffsets(groupId: String,
partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
if (!isActive.get) {
partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
} else if (!isCoordinatorForGroup(groupId)) {
partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
} else {
val group = coordinatorMetadata.getGroup(groupId)
if (group == null) {
// if the group does not exist, it means this group is not relying
// on Kafka for partition management, and hence never send join-group
// request to the coordinator before; in this case blindly fetch the offsets
offsetManager.getOffsets(groupId, partitions)
} else {
group synchronized {
if (group.is(Dead)) {
partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)}.toMap
} else {
offsetManager.getOffsets(groupId, partitions)
}
}
}
}
}
def handleGroupImmigration(offsetTopicPartitionId: Int) = {
// TODO we may need to add more logic in KAFKA-2017
offsetManager.loadOffsetsFromLog(offsetTopicPartitionId)
}
def handleGroupEmigration(offsetTopicPartitionId: Int) = {
// TODO we may need to add more logic in KAFKA-2017
offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId)
}
/**
* Complete existing DelayedHeartbeats for the given consumer and schedule the next one
*/
@ -246,8 +341,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
private def prepareRebalance(group: ConsumerGroupMetadata) {
group.transitionTo(PreparingRebalance)
group.generationId += 1
info("Preparing to rebalance group %s generation %s".format(group.groupId, group.generationId))
info("Preparing to rebalance group %s with old generation %s".format(group.groupId, group.generationId))
val rebalanceTimeout = group.rebalanceTimeout
val delayedRebalance = new DelayedRebalance(this, group, rebalanceTimeout)
@ -259,7 +353,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata])
group.transitionTo(Rebalancing)
info("Rebalancing group %s generation %s".format(group.groupId, group.generationId))
group.generationId += 1
info("Rebalancing group %s with new generation %s".format(group.groupId, group.generationId))
val assignedPartitionsPerConsumer = reassignPartitions(group)
trace("Rebalance for group %s generation %s has assigned partitions: %s"
@ -275,8 +371,6 @@ class ConsumerCoordinator(val config: KafkaConfig,
maybePrepareRebalance(group)
}
private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
private def reassignPartitions(group: ConsumerGroupMetadata) = {
val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy)
val topicsPerConsumer = group.topicsPerConsumer
@ -345,8 +439,54 @@ class ConsumerCoordinator(val config: KafkaConfig,
}
}
def onCompleteHeartbeat() {}
def onCompleteHeartbeat() {
// TODO: add metrics for complete heartbeats
}
def partitionFor(group: String): Int = offsetManager.partitionFor(group)
private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) =
consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline
private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
}
object ConsumerCoordinator {
val OffsetsTopicName = "__consumer_offsets"
def create(config: KafkaConfig,
zkClient: ZkClient,
replicaManager: ReplicaManager,
kafkaScheduler: KafkaScheduler): ConsumerCoordinator = {
val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkClient, kafkaScheduler)
}
def create(config: KafkaConfig,
zkClient: ZkClient,
offsetManager: OffsetManager): ConsumerCoordinator = {
val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkClient)
}
}

View File

@ -32,7 +32,7 @@ import scala.collection.mutable
* It delegates all group logic to the callers.
*/
@threadsafe
private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
private[coordinator] class CoordinatorMetadata(brokerId: Int,
zkClient: ZkClient,
maybePrepareRebalance: ConsumerGroupMetadata => Unit) {
@ -179,7 +179,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
* Zookeeper listener to handle topic partition changes
*/
class TopicPartitionChangeListener extends IZkDataListener with Logging {
this.logIdent = "[TopicPartitionChangeListener on Coordinator " + config.brokerId + "]: "
this.logIdent = "[TopicPartitionChangeListener on Coordinator " + brokerId + "]: "
override def handleDataChange(dataPath: String, data: Object) {
info("Handling data change for path: %s data: %s".format(dataPath, data))

View File

@ -37,7 +37,6 @@ import org.I0Itec.zkclient.ZkClient
*/
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val offsetManager: OffsetManager,
val coordinator: ConsumerCoordinator,
val controller: KafkaController,
val zkClient: ZkClient,
@ -95,8 +94,23 @@ class KafkaApis(val requestChannel: RequestChannel,
// stop serving data to clients for the topic being deleted
val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
try {
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
// call replica manager to handle updating partitions to become leader or follower
val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode)
// for each new leader or follower, call coordinator to handle
// consumer group migration
result.updatedLeaders.foreach { case partition =>
if (partition.topic == ConsumerCoordinator.OffsetsTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
result.updatedFollowers.foreach { case partition =>
partition.leaderReplicaIdOpt.foreach { leaderReplica =>
if (partition.topic == ConsumerCoordinator.OffsetsTopicName &&
leaderReplica == brokerId)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse)))
} catch {
case e: KafkaStorageException =>
@ -142,6 +156,12 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleOffsetCommitRequest(request: RequestChannel.Request) {
val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
// filter non-exist topics
val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) =>
!metadataCache.contains(topicAndPartition.topic)
}
val filteredRequestInfo = (offsetCommitRequest.requestInfo -- invalidRequestsInfo.keys)
// the callback for sending an offset commit response
def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
commitStatus.foreach { case (topicAndPartition, errorCode) =>
@ -154,14 +174,14 @@ class KafkaApis(val requestChannel: RequestChannel,
topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
}
}
val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId)
val combinedCommitStatus = commitStatus ++ invalidRequestsInfo.map(_._1 -> ErrorMapping.UnknownTopicOrPartitionCode)
val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
if (offsetCommitRequest.versionId == 0) {
// for version 0 always store offsets to ZK
val responseInfo = offsetCommitRequest.requestInfo.map {
val responseInfo = filteredRequestInfo.map {
case (topicAndPartition, metaAndError) => {
val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
try {
@ -189,7 +209,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetRetention =
if (offsetCommitRequest.versionId <= 1 ||
offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) {
offsetManager.config.offsetsRetentionMs
coordinator.offsetConfig.offsetsRetentionMs
} else {
offsetCommitRequest.retentionMs
}
@ -203,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val currentTimestamp = SystemTime.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata =>
val offsetData = filteredRequestInfo.mapValues(offsetAndMetadata =>
offsetAndMetadata.copy(
commitTimestamp = currentTimestamp,
expireTimestamp = {
@ -215,8 +235,8 @@ class KafkaApis(val requestChannel: RequestChannel,
)
)
// call offset manager to store offsets
offsetManager.storeOffsets(
// call coordinator to handle commit offset
coordinator.handleCommitOffsets(
offsetCommitRequest.groupId,
offsetCommitRequest.consumerId,
offsetCommitRequest.groupGenerationId,
@ -422,9 +442,9 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topics.size > 0 && topicResponses.size != topics.size) {
val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) {
if (topic == ConsumerCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) {
try {
if (topic == OffsetManager.OffsetsTopicName) {
if (topic == ConsumerCoordinator.OffsetsTopicName) {
val aliveBrokers = metadataCache.getAliveBrokers
val offsetsTopicReplicationFactor =
if (aliveBrokers.length > 0)
@ -433,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel,
config.offsetsTopicReplicationFactor.toInt
AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
offsetsTopicReplicationFactor,
offsetManager.offsetsTopicConfig)
coordinator.offsetsTopicConfigs)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
}
@ -496,26 +516,19 @@ class KafkaApis(val requestChannel: RequestChannel,
OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
} else {
// version 1 reads offsets from Kafka
val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
)
val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
val knownStatus =
if (knownTopicPartitions.size > 0)
offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
else
Map.empty[TopicAndPartition, OffsetMetadataAndError]
val status = unknownStatus ++ knownStatus
// version 1 reads offsets from Kafka;
val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap
OffsetFetchResponse(status, offsetFetchRequest.correlationId)
// Note that we do not need to filter the partitions in the
// metadata cache as the topic partitions will be filtered
// in coordinator's offset manager through the offset cache
OffsetFetchResponse(offsets, offsetFetchRequest.correlationId)
}
trace("Sending offset fetch response %s for correlation id %d to client %s."
.format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
}
/*
@ -524,10 +537,10 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleConsumerMetadataRequest(request: RequestChannel.Request) {
val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
val partition = offsetManager.partitionFor(consumerMetadataRequest.group)
val partition = coordinator.partitionFor(consumerMetadataRequest.group)
// get metadata (and create the topic if necessary)
val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).head
val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName), request.securityProtocol).head
val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)

View File

@ -41,7 +41,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker
import kafka.network.{BlockingChannel, SocketServer}
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
import kafka.coordinator.ConsumerCoordinator
import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator}
/**
* Represents the lifecycle of a single Kafka broker. Handles all functionality required
@ -75,8 +75,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
var logManager: LogManager = null
var offsetManager: OffsetManager = null
var replicaManager: ReplicaManager = null
var topicConfigManager: TopicConfigManager = null
@ -157,19 +155,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
replicaManager.startup()
/* start offset manager */
offsetManager = createOffsetManager()
/* start kafka controller */
kafkaController = new KafkaController(config, zkClient, brokerState)
kafkaController.startup()
/* start kafka coordinator */
consumerCoordinator = new ConsumerCoordinator(config, zkClient, offsetManager)
consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler)
consumerCoordinator.startup()
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator,
apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)
@ -349,8 +344,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
CoreUtils.swallow(socketServer.shutdown())
if(requestHandlerPool != null)
CoreUtils.swallow(requestHandlerPool.shutdown())
if(offsetManager != null)
offsetManager.shutdown()
CoreUtils.swallow(kafkaScheduler.shutdown())
if(apis != null)
CoreUtils.swallow(apis.close())
@ -450,19 +443,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
logProps
}
private def createOffsetManager(): OffsetManager = {
val offsetManagerConfig = OffsetManagerConfig(
maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache)
}
/**
* Generates new brokerId or reads from meta.properties based on following conditions
* <ol>

View File

@ -17,6 +17,7 @@
package kafka.server
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
import org.apache.kafka.common.protocol.types.Type.STRING
import org.apache.kafka.common.protocol.types.Type.INT32
@ -25,19 +26,19 @@ import org.apache.kafka.common.utils.Utils
import kafka.utils._
import kafka.common._
import kafka.log.{FileMessageSet, LogConfig}
import kafka.log.FileMessageSet
import kafka.message._
import kafka.metrics.KafkaMetricsGroup
import kafka.common.TopicAndPartition
import kafka.tools.MessageFormatter
import kafka.api.ProducerResponseStatus
import kafka.coordinator.ConsumerCoordinator
import scala.Some
import scala.collection._
import java.io.PrintStream
import java.util.concurrent.atomic.AtomicBoolean
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Gauge
@ -87,8 +88,7 @@ object OffsetManagerConfig {
class OffsetManager(val config: OffsetManagerConfig,
replicaManager: ReplicaManager,
zkClient: ZkClient,
scheduler: Scheduler,
metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup {
scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
/* offsets and metadata cache */
private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
@ -143,9 +143,9 @@ class OffsetManager(val config: OffsetManagerConfig,
// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
partitionOpt.map { partition =>
val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
val appendPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
val messages = tombstones.map(_._2).toSeq
trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
@ -170,14 +170,6 @@ class OffsetManager(val config: OffsetManagerConfig,
}
def offsetsTopicConfig: Properties = {
val props = new Properties
props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
props.put(LogConfig.CleanupPolicyProp, "compact")
props.put(LogConfig.CompressionTypeProp, "uncompressed")
props
}
def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions
/**
@ -214,22 +206,14 @@ class OffsetManager(val config: OffsetManagerConfig,
/**
* Store offsets by appending it to the replicated log and then inserting to cache
*/
// TODO: generation id and consumer id is needed by coordinator to do consumer checking in the future
def storeOffsets(groupId: String,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
// check if there are any non-existent topics
val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) =>
!metadataCache.contains(topicAndPartition.topic)
}
// first filter out partitions with offset metadata size exceeding limit or
// if its a non existing topic
// TODO: in the future we may want to only support atomic commit and hence fail the whole commit
// first filter out partitions with offset metadata size exceeding limit
val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition)
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
// construct the message set to append
@ -240,7 +224,7 @@ class OffsetManager(val config: OffsetManagerConfig,
)
}.toSeq
val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupId))
val offsetTopicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, partitionFor(groupId))
val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@ -271,6 +255,10 @@ class OffsetManager(val config: OffsetManagerConfig,
ErrorMapping.ConsumerCoordinatorNotAvailableCode
else if (status.error == ErrorMapping.NotLeaderForPartitionCode)
ErrorMapping.NotCoordinatorForConsumerCode
else if (status.error == ErrorMapping.MessageSizeTooLargeCode
|| status.error == ErrorMapping.MessageSetSizeTooLargeCode
|| status.error == ErrorMapping.InvalidFetchSizeCode)
Errors.INVALID_COMMIT_OFFSET_SIZE.code
else
status.error
}
@ -278,9 +266,7 @@ class OffsetManager(val config: OffsetManagerConfig,
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
if (nonExistentTopics.contains(topicAndPartition))
(topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
else if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
(topicAndPartition, responseCode)
else
(topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
@ -338,7 +324,7 @@ class OffsetManager(val config: OffsetManagerConfig,
debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
topicPartitions.map { topicAndPartition =>
val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
(groupTopicPartition.topicPartition, OffsetMetadataAndError.NotOffsetManagerForGroup)
(groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup)
}.toMap
}
}
@ -349,7 +335,7 @@ class OffsetManager(val config: OffsetManagerConfig,
*/
def loadOffsetsFromLog(offsetsPartition: Int) {
val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
val topicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
loadingPartitions synchronized {
if (loadingPartitions.contains(offsetsPartition)) {
@ -421,7 +407,7 @@ class OffsetManager(val config: OffsetManagerConfig,
}
private def getHighWatermark(partitionId: Int): Long = {
val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId)
val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, partitionId)
val hw = partitionOpt.map { partition =>
partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@ -449,7 +435,7 @@ class OffsetManager(val config: OffsetManagerConfig,
}
if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
.format(numRemoved, TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)))
.format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)))
}
@ -461,8 +447,6 @@ class OffsetManager(val config: OffsetManagerConfig,
object OffsetManager {
val OffsetsTopicName = "__consumer_offsets"
private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
private val CURRENT_OFFSET_SCHEMA_VERSION = 1.toShort

View File

@ -23,19 +23,19 @@ import kafka.cluster.{BrokerEndPoint, Partition, Replica}
import kafka.log.{LogAppendInfo, LogManager}
import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import kafka.common.TopicAndPartition
import kafka.message.{ByteBufferMessageSet, MessageSet}
import kafka.api.ProducerResponseStatus
import kafka.common.TopicAndPartition
import kafka.api.PartitionFetchInfo
import org.apache.kafka.common.protocol.Errors
import java.util.concurrent.atomic.AtomicBoolean
import java.io.{IOException, File}
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.protocol.Errors
import scala.Predef._
import scala.Some
import scala.collection._
import scala.collection.mutable.HashMap
import scala.collection.Map
import scala.collection.Set
import org.I0Itec.zkclient.ZkClient
import com.yammer.metrics.core.Gauge
@ -84,6 +84,17 @@ object LogReadResult {
false)
}
case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short],
updatedLeaders: Set[Partition],
updatedFollowers: Set[Partition],
errorCode: Short) {
override def toString = {
"updated leaders: [%s], updated followers: [%s], update results: [%s], global error: [%d]"
.format(updatedLeaders, updatedFollowers, responseMap, errorCode)
}
}
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"
}
@ -393,10 +404,10 @@ class ReplicaManager(val config: KafkaConfig,
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
case nle: NotLeaderForPartitionException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
case mtl: MessageSizeTooLargeException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl)))
case mstl: MessageSetSizeTooLargeException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl)))
case mtle: MessageSizeTooLargeException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle)))
case mstle: MessageSetSizeTooLargeException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
case imse : InvalidMessageSizeException =>
(topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
case t: Throwable =>
@ -416,7 +427,7 @@ class ReplicaManager(val config: KafkaConfig,
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchInfo: Map[TopicAndPartition, PartitionFetchInfo],
fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo],
responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
val isFromFollower = replicaId >= 0
@ -544,30 +555,29 @@ class ReplicaManager(val config: KafkaConfig,
}
}
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition))
}
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
val responseMap = new mutable.HashMap[(String, Int), Short]
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
}
(responseMap, ErrorMapping.StaleControllerEpochCode)
BecomeLeaderOrFollowerResult(responseMap, Set.empty[Partition], Set.empty[Partition], ErrorMapping.StaleControllerEpochCode)
} else {
val controllerId = leaderAndISRRequest.controllerId
val correlationId = leaderAndISRRequest.correlationId
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
val partitionState = new HashMap[Partition, PartitionStateInfo]()
leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
val partitionState = new mutable.HashMap[Partition, PartitionStateInfo]()
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partitionId), partitionStateInfo) =>
val partition = getOrCreatePartition(topic, partitionId)
val partitionLeaderEpoch = partition.getLeaderEpoch()
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
@ -591,14 +601,19 @@ class ReplicaManager(val config: KafkaConfig,
}
}
val partitionsTobeLeader = partitionState
.filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
val partitionsTobeLeader = partitionState.filter { case (partition, partitionStateInfo) =>
partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId
}
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
if (!partitionsTobeLeader.isEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager)
if (!partitionsToBeFollower.isEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager)
val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
else
Set.empty[Partition]
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
@ -607,7 +622,7 @@ class ReplicaManager(val config: KafkaConfig,
hwThreadInitialized = true
}
replicaFetcherManager.shutdownIdleFetcherThreads()
(responseMap, ErrorMapping.NoError)
BecomeLeaderOrFollowerResult(responseMap, partitionsBecomeLeader, partitionsBecomeFollower, ErrorMapping.NoError)
}
}
}
@ -623,10 +638,11 @@ class ReplicaManager(val config: KafkaConfig,
* the error message will be set on each partition since we do not know which partition caused it
* TODO: the above may need to be fixed later
*/
private def makeLeaders(controllerId: Int, epoch: Int,
private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionStateInfo],
correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) = {
correlationId: Int,
responseMap: mutable.Map[(String, Int), Short]): Set[Partition] = {
partitionState.foreach(state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-leader transition for partition %s")
@ -645,7 +661,7 @@ class ReplicaManager(val config: KafkaConfig,
}
// Update the partition information to be the leader
partitionState.foreach{ case (partition, partitionStateInfo) =>
partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
partition.makeLeader(controllerId, partitionStateInfo, correlationId)}
} catch {
case e: Throwable =>
@ -664,6 +680,8 @@ class ReplicaManager(val config: KafkaConfig,
"for the become-leader transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
partitionState.keySet
}
/*
@ -682,9 +700,12 @@ class ReplicaManager(val config: KafkaConfig,
* If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
* the error message will be set on each partition since we do not know which partition caused it
*/
private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[BrokerEndPoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) {
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[BrokerEndPoint],
correlationId: Int,
responseMap: mutable.Map[(String, Int), Short]) : Set[Partition] = {
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition %s")
@ -694,18 +715,18 @@ class ReplicaManager(val config: KafkaConfig,
for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
var partitionsToMakeFollower: Set[Partition] = Set()
// TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
// TODO: Delete leaders from LeaderAndIsrRequest
partitionState.foreach{ case (partition, partitionStateInfo) =>
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
leaders.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(leaderBroker) =>
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition
else
stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
@ -775,6 +796,8 @@ class ReplicaManager(val config: KafkaConfig,
"for the become-follower transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
partitionsToMakeFollower
}
private def maybeShrinkIsr(): Unit = {

View File

@ -25,12 +25,13 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
import kafka.utils.{TestUtils, Logging}
import kafka.server.{KafkaConfig, OffsetManager}
import kafka.server.KafkaConfig
import java.util.ArrayList
import org.junit.Assert._
import scala.collection.JavaConversions._
import kafka.coordinator.ConsumerCoordinator
/**
@ -158,9 +159,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
consumer0.poll(50)
// get metadata for the topic
var parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName)
while(parts == null)
parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName)
assertEquals(1, parts.size)
assertNotNull(parts(0).leader())

View File

@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
import kafka.server.{OffsetManager, KafkaConfig}
import kafka.integration.KafkaServerTestHarness
import scala.collection.mutable.Buffer
import kafka.coordinator.ConsumerCoordinator
/**
* A helper class for writing integration tests that involve producers, consumers, and servers
@ -63,11 +64,11 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
consumers += new KafkaConsumer(consumerConfig)
// create the consumer offset topic
TestUtils.createTopic(zkClient, OffsetManager.OffsetsTopicName,
serverConfig.getProperty("offsets.topic.num.partitions").toInt,
serverConfig.getProperty("offsets.topic.replication.factor").toInt,
TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName,
serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
servers,
servers(0).offsetManager.offsetsTopicConfig)
servers(0).consumerCoordinator.offsetsTopicConfigs)
}
override def tearDown() {

View File

@ -22,9 +22,9 @@ import org.scalatest.junit.JUnit3Suite
import kafka.utils.Logging
import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import kafka.server.{OffsetManager, KafkaConfig}
import kafka.admin.TopicCommand.TopicCommandOptions
import kafka.utils.ZkUtils
import kafka.coordinator.ConsumerCoordinator
class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@ -87,12 +87,12 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", OffsetManager.OffsetsTopicName))
"--topic", ConsumerCoordinator.OffsetsTopicName))
TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
// try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", OffsetManager.OffsetsTopicName))
val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(OffsetManager.OffsetsTopicName)
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName))
val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)

View File

@ -22,6 +22,7 @@ import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import kafka.server.OffsetManager
import kafka.coordinator.ConsumerCoordinator
class TopicFilterTest extends JUnitSuite {
@ -37,8 +38,8 @@ class TopicFilterTest extends JUnitSuite {
val topicFilter2 = new Whitelist(".+")
assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
assertFalse(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
assertTrue(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
assertFalse(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true))
assertTrue(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false))
val topicFilter3 = new Whitelist("white_listed-topic.+")
assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@ -57,8 +58,8 @@ class TopicFilterTest extends JUnitSuite {
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
assertFalse(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
assertTrue(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
assertFalse(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true))
assertTrue(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false))
}
@Test

View File

@ -22,8 +22,8 @@ import java.util.concurrent.TimeUnit
import junit.framework.Assert._
import kafka.common.TopicAndPartition
import kafka.server.{KafkaConfig, OffsetManager}
import kafka.utils.TestUtils
import kafka.server.{OffsetManager, ReplicaManager, KafkaConfig}
import kafka.utils.{KafkaScheduler, TestUtils}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupRequest
import org.easymock.EasyMock
@ -45,8 +45,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
val ConsumerMinSessionTimeout = 10
val ConsumerMaxSessionTimeout = 30
val DefaultSessionTimeout = 20
var offsetManager: OffsetManager = null
var consumerCoordinator: ConsumerCoordinator = null
var offsetManager : OffsetManager = null
@Before
def setUp() {
@ -54,12 +54,13 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
props.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
props.setProperty(KafkaConfig.ConsumerMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
offsetManager = EasyMock.createStrictMock(classOf[OffsetManager])
consumerCoordinator = new ConsumerCoordinator(KafkaConfig.fromProps(props), null, offsetManager)
consumerCoordinator = ConsumerCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager)
consumerCoordinator.startup()
}
@After
def tearDown() {
EasyMock.reset(offsetManager)
consumerCoordinator.shutdown()
}

View File

@ -40,7 +40,7 @@ class CoordinatorMetadataTest extends JUnitSuite {
def setUp() {
val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
zkClient = EasyMock.createStrictMock(classOf[ZkClient])
coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props), zkClient, null)
coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkClient, null)
}
@Test

View File

@ -120,7 +120,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition))
val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2)
assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get)
assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get)
assertEquals(1, fetchResponse2.requestInfo.size)
}
@ -166,14 +166,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)
assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get)
assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)