diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index 68b4cb18445..c1c8172cd45 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -290,9 +290,10 @@ public final class Coordinator { // re-discover the coordinator and retry coordinatorDead(); future.retryWithNewCoordinator(); - } else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { - // just ignore this partition - log.debug("Unknown topic or partition for " + tp); + } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code() + || data.errorCode == Errors.ILLEGAL_GENERATION.code()) { + // need to re-join group + subscriptions.needReassignment(); } else { future.raise(new KafkaException("Unexpected error in fetch offset response: " + Errors.forCode(data.errorCode).exception().getMessage())); @@ -499,13 +500,23 @@ public final class Coordinator { || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) { coordinatorDead(); future.retryWithNewCoordinator(); - } else { + } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code() + || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) { // do not need to throw the exception but just log the error - future.retryAfterBackoff(); log.error("Error committing partition {} at offset {}: {}", - tp, - offset, - Errors.forCode(errorCode).exception().getMessage()); + tp, + offset, + Errors.forCode(errorCode).exception().getMessage()); + } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code() + || errorCode == Errors.ILLEGAL_GENERATION.code()) { + // need to re-join group + subscriptions.needReassignment(); + } else { + // re-throw the exception as these should not happen + log.error("Error committing partition {} at offset {}: {}", + tp, + offset, + Errors.forCode(errorCode).exception().getMessage()); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 5b898c8f8ad..4c0ecc3badd 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -77,7 +77,11 @@ public enum Errors { UNKNOWN_CONSUMER_ID(25, new ApiException("The coordinator is not aware of this consumer.")), INVALID_SESSION_TIMEOUT(26, - new ApiException("The session timeout is not within an acceptable range.")); + new ApiException("The session timeout is not within an acceptable range.")), + COMMITTING_PARTITIONS_NOT_ASSIGNED(27, + new ApiException("Some of the committing partitions are not assigned the committer")), + INVALID_COMMIT_OFFSET_SIZE(28, + new ApiException("The committing offset data size is not valid")); private static Map, Errors> classToError = new HashMap, Errors>(); private static Map codeToError = new HashMap(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index 70844d65369..a1633330dc0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -41,7 +41,13 @@ public class OffsetCommitResponse extends AbstractRequestResponse { /** * Possible error code: * - * TODO + * OFFSET_METADATA_TOO_LARGE (12) + * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) + * NOT_COORDINATOR_FOR_CONSUMER (16) + * ILLEGAL_GENERATION (22) + * UNKNOWN_CONSUMER_ID (25) + * COMMITTING_PARTITIONS_NOT_ASSIGNED (27) + * INVALID_COMMIT_OFFSET_SIZE (28) */ private final Map responseData; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index b5e8a0ff0aa..6ee75973d64 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -42,9 +42,6 @@ public class OffsetFetchRequest extends AbstractRequest { // partition level field names private static final String PARTITION_KEY_NAME = "partition"; - public static final int DEFAULT_GENERATION_ID = -1; - public static final String DEFAULT_CONSUMER_ID = ""; - private final String groupId; private final List partitions; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 512a0ef7e61..3dc8521296e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -47,10 +47,11 @@ public class OffsetFetchResponse extends AbstractRequestResponse { /** * Possible error code: * - * UNKNOWN_TOPIC_OR_PARTITION (3) + * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0 * OFFSET_LOAD_IN_PROGRESS (14) * NOT_COORDINATOR_FOR_CONSUMER (16) - * NO_OFFSETS_FETCHABLE (23) + * ILLEGAL_GENERATION (22) + * UNKNOWN_CONSUMER_ID (25) */ private final Map responseData; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java index 613b192ba84..d085fe5c9e2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java @@ -333,13 +333,6 @@ public class CoordinatorTest { assertTrue(result.isDone()); assertTrue(result.value().isEmpty()); - // fetch with offset topic unknown - client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L)); - result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); - client.poll(0, time.milliseconds()); - assertTrue(result.isDone()); - assertTrue(result.value().isEmpty()); - // fetch with offset -1 client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds()); diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index dacbdd089f9..a2ecb9620d6 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -27,8 +27,8 @@ import scala.collection._ import scala.collection.JavaConversions._ import kafka.log.LogConfig import kafka.consumer.Whitelist -import kafka.server.OffsetManager import org.apache.kafka.common.utils.Utils +import kafka.coordinator.ConsumerCoordinator object TopicCommand { @@ -111,7 +111,7 @@ object TopicCommand { println("Updated config for topic \"%s\".".format(topic)) } if(opts.options.has(opts.partitionsOpt)) { - if (topic == OffsetManager.OffsetsTopicName) { + if (topic == ConsumerCoordinator.OffsetsTopicName) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 0990938b33b..2649090b6cb 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock} import kafka.admin.AdminUtils import kafka.api.{PartitionStateInfo, LeaderAndIsr} import kafka.log.LogConfig -import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, LogReadResult, ReplicaManager} +import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, LogReadResult, ReplicaManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController import kafka.message.ByteBufferMessageSet @@ -160,8 +160,7 @@ class Partition(val topic: String, * and setting the new leader and ISR */ def makeLeader(controllerId: Int, - partitionStateInfo: PartitionStateInfo, correlationId: Int, - offsetManager: OffsetManager): Boolean = { + partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = { inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -186,8 +185,6 @@ class Partition(val topic: String, if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(newLeaderReplica) - if (topic == OffsetManager.OffsetsTopicName) - offsetManager.loadOffsetsFromLog(partitionId) true } } @@ -198,7 +195,7 @@ class Partition(val topic: String, */ def makeFollower(controllerId: Int, partitionStateInfo: PartitionStateInfo, - correlationId: Int, offsetManager: OffsetManager): Boolean = { + correlationId: Int): Boolean = { inWriteLock(leaderIsrUpdateLock) { val allReplicas = partitionStateInfo.allReplicas val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch @@ -215,13 +212,6 @@ class Partition(val topic: String, leaderEpoch = leaderAndIsr.leaderEpoch zkVersion = leaderAndIsr.zkVersion - leaderReplicaIdOpt.foreach { leaderReplica => - if (topic == OffsetManager.OffsetsTopicName && - /* if we are making a leader->follower transition */ - leaderReplica == localBrokerId) - offsetManager.removeOffsetsFromCacheForPartition(partitionId) - } - if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) { false } diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 6b4242c7cd1..deb48b1cee5 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -17,6 +17,8 @@ package kafka.common +import org.apache.kafka.common.protocol.Errors + case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) { override def toString = "OffsetMetadata[%d,%s]" .format(offset, @@ -51,7 +53,7 @@ object OffsetAndMetadata { def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata)) } -case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = ErrorMapping.NoError) { +case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) { def offset = offsetMetadata.offset def metadata = offsetMetadata.metadata @@ -60,10 +62,12 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = } object OffsetMetadataAndError { - val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NoError) - val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.OffsetsLoadInProgressCode) - val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.UnknownTopicOrPartitionCode) - val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NotCoordinatorForConsumerCode) + val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code) + val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code) + val UnknownConsumer = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_CONSUMER_ID.code) + val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) + val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code) def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), ErrorMapping.NoError) diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index ad759786d1c..32595d6fe43 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -18,7 +18,7 @@ package kafka.common import util.matching.Regex -import kafka.server.OffsetManager +import kafka.coordinator.ConsumerCoordinator object Topic { @@ -26,7 +26,7 @@ object Topic { private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") - val InternalTopics = Set(OffsetManager.OffsetsTopicName) + val InternalTopics = Set(ConsumerCoordinator.OffsetsTopicName) def validate(topic: String) { if (topic.length <= 0) diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala index a385adbd7cb..476973b2c55 100644 --- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala @@ -16,7 +16,9 @@ */ package kafka.coordinator -import kafka.common.TopicAndPartition +import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition} +import kafka.message.UncompressedCodec +import kafka.log.LogConfig import kafka.server._ import kafka.utils._ import org.apache.kafka.common.protocol.Errors @@ -24,7 +26,11 @@ import org.apache.kafka.common.requests.JoinGroupRequest import org.I0Itec.zkclient.ZkClient import java.util.concurrent.atomic.AtomicBoolean +import java.util.Properties +import scala.collection.{Map, Seq, immutable} +case class GroupManagerConfig(consumerMinSessionTimeoutMs: Int, + consumerMaxSessionTimeoutMs: Int) /** * ConsumerCoordinator handles consumer group and consumer offset management. @@ -33,11 +39,13 @@ import java.util.concurrent.atomic.AtomicBoolean * consumer groups. Consumer groups are assigned to coordinators based on their * group names. */ -class ConsumerCoordinator(val config: KafkaConfig, - val zkClient: ZkClient, - val offsetManager: OffsetManager) extends Logging { +class ConsumerCoordinator(val brokerId: Int, + val groupConfig: GroupManagerConfig, + val offsetConfig: OffsetManagerConfig, + private val offsetManager: OffsetManager, + zkClient: ZkClient) extends Logging { - this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: " + this.logIdent = "[ConsumerCoordinator " + brokerId + "]: " private val isActive = new AtomicBoolean(false) @@ -45,6 +53,22 @@ class ConsumerCoordinator(val config: KafkaConfig, private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null private var coordinatorMetadata: CoordinatorMetadata = null + def this(brokerId: Int, + groupConfig: GroupManagerConfig, + offsetConfig: OffsetManagerConfig, + replicaManager: ReplicaManager, + zkClient: ZkClient, + scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig, + new OffsetManager(offsetConfig, replicaManager, zkClient, scheduler), zkClient) + + def offsetsTopicConfigs: Properties = { + val props = new Properties + props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString) + props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name) + props + } + /** * NOTE: If a group lock and metadataLock are simultaneously needed, * be sure to acquire the group lock before metadataLock to prevent deadlock @@ -55,9 +79,9 @@ class ConsumerCoordinator(val config: KafkaConfig, */ def startup() { info("Starting up.") - heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId) - rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId) - coordinatorMetadata = new CoordinatorMetadata(config, zkClient, maybePrepareRebalance) + heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) + rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId) + coordinatorMetadata = new CoordinatorMetadata(brokerId, zkClient, maybePrepareRebalance) isActive.set(true) info("Startup complete.") } @@ -69,6 +93,7 @@ class ConsumerCoordinator(val config: KafkaConfig, def shutdown() { info("Shutting down.") isActive.set(false) + offsetManager.shutdown() coordinatorMetadata.shutdown() heartbeatPurgatory.shutdown() rebalancePurgatory.shutdown() @@ -87,7 +112,8 @@ class ConsumerCoordinator(val config: KafkaConfig, responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code) } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) { responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code) - } else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs > config.consumerMaxSessionTimeoutMs) { + } else if (sessionTimeoutMs < groupConfig.consumerMinSessionTimeoutMs || + sessionTimeoutMs > groupConfig.consumerMaxSessionTimeoutMs) { responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code) } else { // only try to create the group if the group is not unknown AND @@ -196,6 +222,75 @@ class ConsumerCoordinator(val config: KafkaConfig, } } + def handleCommitOffsets(groupId: String, + consumerId: String, + generationId: Int, + offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { + if (!isActive.get) { + responseCallback(offsetMetadata.mapValues(_ => Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code)) + } else if (!isCoordinatorForGroup(groupId)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_CONSUMER.code)) + } else { + val group = coordinatorMetadata.getGroup(groupId) + if (group == null) { + // if the group does not exist, it means this group is not relying + // on Kafka for partition management, and hence never send join-group + // request to the coordinator before; in this case blindly commit the offsets + offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback) + } else { + group synchronized { + if (group.is(Dead)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code)) + } else if (!group.has(consumerId)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code)) + } else if (generationId != group.generationId) { + responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) + } else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code)) + } else { + offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback) + } + } + } + } + } + + def handleFetchOffsets(groupId: String, + partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = { + if (!isActive.get) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap + } else if (!isCoordinatorForGroup(groupId)) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap + } else { + val group = coordinatorMetadata.getGroup(groupId) + if (group == null) { + // if the group does not exist, it means this group is not relying + // on Kafka for partition management, and hence never send join-group + // request to the coordinator before; in this case blindly fetch the offsets + offsetManager.getOffsets(groupId, partitions) + } else { + group synchronized { + if (group.is(Dead)) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)}.toMap + } else { + offsetManager.getOffsets(groupId, partitions) + } + } + } + } + } + + def handleGroupImmigration(offsetTopicPartitionId: Int) = { + // TODO we may need to add more logic in KAFKA-2017 + offsetManager.loadOffsetsFromLog(offsetTopicPartitionId) + } + + def handleGroupEmigration(offsetTopicPartitionId: Int) = { + // TODO we may need to add more logic in KAFKA-2017 + offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId) + } + /** * Complete existing DelayedHeartbeats for the given consumer and schedule the next one */ @@ -246,8 +341,7 @@ class ConsumerCoordinator(val config: KafkaConfig, private def prepareRebalance(group: ConsumerGroupMetadata) { group.transitionTo(PreparingRebalance) - group.generationId += 1 - info("Preparing to rebalance group %s generation %s".format(group.groupId, group.generationId)) + info("Preparing to rebalance group %s with old generation %s".format(group.groupId, group.generationId)) val rebalanceTimeout = group.rebalanceTimeout val delayedRebalance = new DelayedRebalance(this, group, rebalanceTimeout) @@ -259,7 +353,9 @@ class ConsumerCoordinator(val config: KafkaConfig, assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata]) group.transitionTo(Rebalancing) - info("Rebalancing group %s generation %s".format(group.groupId, group.generationId)) + group.generationId += 1 + + info("Rebalancing group %s with new generation %s".format(group.groupId, group.generationId)) val assignedPartitionsPerConsumer = reassignPartitions(group) trace("Rebalance for group %s generation %s has assigned partitions: %s" @@ -275,8 +371,6 @@ class ConsumerCoordinator(val config: KafkaConfig, maybePrepareRebalance(group) } - private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) - private def reassignPartitions(group: ConsumerGroupMetadata) = { val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy) val topicsPerConsumer = group.topicsPerConsumer @@ -345,8 +439,54 @@ class ConsumerCoordinator(val config: KafkaConfig, } } - def onCompleteHeartbeat() {} + def onCompleteHeartbeat() { + // TODO: add metrics for complete heartbeats + } + + def partitionFor(group: String): Int = offsetManager.partitionFor(group) private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) = consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline + + private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) +} + +object ConsumerCoordinator { + + val OffsetsTopicName = "__consumer_offsets" + + def create(config: KafkaConfig, + zkClient: ZkClient, + replicaManager: ReplicaManager, + kafkaScheduler: KafkaScheduler): ConsumerCoordinator = { + val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, + loadBufferSize = config.offsetsLoadBufferSize, + offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, + offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, + offsetsTopicNumPartitions = config.offsetsTopicPartitions, + offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, + offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, + offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs, + consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs) + + new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkClient, kafkaScheduler) + } + + def create(config: KafkaConfig, + zkClient: ZkClient, + offsetManager: OffsetManager): ConsumerCoordinator = { + val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, + loadBufferSize = config.offsetsLoadBufferSize, + offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, + offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, + offsetsTopicNumPartitions = config.offsetsTopicPartitions, + offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, + offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, + offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs, + consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs) + + new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkClient) + } } diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala index 0cd5605bcca..2920320d290 100644 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala @@ -32,7 +32,7 @@ import scala.collection.mutable * It delegates all group logic to the callers. */ @threadsafe -private[coordinator] class CoordinatorMetadata(config: KafkaConfig, +private[coordinator] class CoordinatorMetadata(brokerId: Int, zkClient: ZkClient, maybePrepareRebalance: ConsumerGroupMetadata => Unit) { @@ -179,7 +179,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig, * Zookeeper listener to handle topic partition changes */ class TopicPartitionChangeListener extends IZkDataListener with Logging { - this.logIdent = "[TopicPartitionChangeListener on Coordinator " + config.brokerId + "]: " + this.logIdent = "[TopicPartitionChangeListener on Coordinator " + brokerId + "]: " override def handleDataChange(dataPath: String, data: Object) { info("Handling data change for path: %s data: %s".format(dataPath, data)) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ad6f05807c6..18f5b5b895a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -37,7 +37,6 @@ import org.I0Itec.zkclient.ZkClient */ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, - val offsetManager: OffsetManager, val coordinator: ConsumerCoordinator, val controller: KafkaController, val zkClient: ZkClient, @@ -95,8 +94,23 @@ class KafkaApis(val requestChannel: RequestChannel, // stop serving data to clients for the topic being deleted val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] try { - val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) - val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) + // call replica manager to handle updating partitions to become leader or follower + val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest) + val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode) + // for each new leader or follower, call coordinator to handle + // consumer group migration + result.updatedLeaders.foreach { case partition => + if (partition.topic == ConsumerCoordinator.OffsetsTopicName) + coordinator.handleGroupImmigration(partition.partitionId) + } + result.updatedFollowers.foreach { case partition => + partition.leaderReplicaIdOpt.foreach { leaderReplica => + if (partition.topic == ConsumerCoordinator.OffsetsTopicName && + leaderReplica == brokerId) + coordinator.handleGroupEmigration(partition.partitionId) + } + } + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse))) } catch { case e: KafkaStorageException => @@ -142,6 +156,12 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetCommitRequest(request: RequestChannel.Request) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + // filter non-exist topics + val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) => + !metadataCache.contains(topicAndPartition.topic) + } + val filteredRequestInfo = (offsetCommitRequest.requestInfo -- invalidRequestsInfo.keys) + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) { commitStatus.foreach { case (topicAndPartition, errorCode) => @@ -154,14 +174,14 @@ class KafkaApis(val requestChannel: RequestChannel, topicAndPartition, ErrorMapping.exceptionNameFor(errorCode))) } } - - val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) + val combinedCommitStatus = commitStatus ++ invalidRequestsInfo.map(_._1 -> ErrorMapping.UnknownTopicOrPartitionCode) + val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId) requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } if (offsetCommitRequest.versionId == 0) { // for version 0 always store offsets to ZK - val responseInfo = offsetCommitRequest.requestInfo.map { + val responseInfo = filteredRequestInfo.map { case (topicAndPartition, metaAndError) => { val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) try { @@ -189,7 +209,7 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetRetention = if (offsetCommitRequest.versionId <= 1 || offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) { - offsetManager.config.offsetsRetentionMs + coordinator.offsetConfig.offsetsRetentionMs } else { offsetCommitRequest.retentionMs } @@ -203,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel, val currentTimestamp = SystemTime.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata => + val offsetData = filteredRequestInfo.mapValues(offsetAndMetadata => offsetAndMetadata.copy( commitTimestamp = currentTimestamp, expireTimestamp = { @@ -215,8 +235,8 @@ class KafkaApis(val requestChannel: RequestChannel, ) ) - // call offset manager to store offsets - offsetManager.storeOffsets( + // call coordinator to handle commit offset + coordinator.handleCommitOffsets( offsetCommitRequest.groupId, offsetCommitRequest.consumerId, offsetCommitRequest.groupGenerationId, @@ -422,9 +442,9 @@ class KafkaApis(val requestChannel: RequestChannel, if (topics.size > 0 && topicResponses.size != topics.size) { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) { + if (topic == ConsumerCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) { try { - if (topic == OffsetManager.OffsetsTopicName) { + if (topic == ConsumerCoordinator.OffsetsTopicName) { val aliveBrokers = metadataCache.getAliveBrokers val offsetsTopicReplicationFactor = if (aliveBrokers.length > 0) @@ -433,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel, config.offsetsTopicReplicationFactor.toInt AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor, - offsetManager.offsetsTopicConfig) + coordinator.offsetsTopicConfigs) info("Auto creation of topic %s with %d partitions and replication factor %d is successful!" .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor)) } @@ -496,26 +516,19 @@ class KafkaApis(val requestChannel: RequestChannel, OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId) } else { - // version 1 reads offsets from Kafka - val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => - metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty - ) - val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val knownStatus = - if (knownTopicPartitions.size > 0) - offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap - else - Map.empty[TopicAndPartition, OffsetMetadataAndError] - val status = unknownStatus ++ knownStatus + // version 1 reads offsets from Kafka; + val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap - OffsetFetchResponse(status, offsetFetchRequest.correlationId) + // Note that we do not need to filter the partitions in the + // metadata cache as the topic partitions will be filtered + // in coordinator's offset manager through the offset cache + OffsetFetchResponse(offsets, offsetFetchRequest.correlationId) } trace("Sending offset fetch response %s for correlation id %d to client %s." .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) - } /* @@ -524,10 +537,10 @@ class KafkaApis(val requestChannel: RequestChannel, def handleConsumerMetadataRequest(request: RequestChannel.Request) { val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest] - val partition = offsetManager.partitionFor(consumerMetadataRequest.group) + val partition = coordinator.partitionFor(consumerMetadataRequest.group) // get metadata (and create the topic if necessary) - val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).head + val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName), request.securityProtocol).head val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 52dc728bb1a..18917bc4464 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -41,7 +41,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker import kafka.network.{BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge -import kafka.coordinator.ConsumerCoordinator +import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator} /** * Represents the lifecycle of a single Kafka broker. Handles all functionality required @@ -75,8 +75,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var logManager: LogManager = null - var offsetManager: OffsetManager = null - var replicaManager: ReplicaManager = null var topicConfigManager: TopicConfigManager = null @@ -157,19 +155,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup() - /* start offset manager */ - offsetManager = createOffsetManager() - /* start kafka controller */ kafkaController = new KafkaController(config, zkClient, brokerState) kafkaController.startup() /* start kafka coordinator */ - consumerCoordinator = new ConsumerCoordinator(config, zkClient, offsetManager) + consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler) consumerCoordinator.startup() /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator, + apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, kafkaController, zkClient, config.brokerId, config, metadataCache) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -349,8 +344,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg CoreUtils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown()) - if(offsetManager != null) - offsetManager.shutdown() CoreUtils.swallow(kafkaScheduler.shutdown()) if(apis != null) CoreUtils.swallow(apis.close()) @@ -450,19 +443,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg logProps } - private def createOffsetManager(): OffsetManager = { - val offsetManagerConfig = OffsetManagerConfig( - maxMetadataSize = config.offsetMetadataMaxSize, - loadBufferSize = config.offsetsLoadBufferSize, - offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, - offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, - offsetsTopicNumPartitions = config.offsetsTopicPartitions, - offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, - offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, - offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) - new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache) - } - /** * Generates new brokerId or reads from meta.properties based on following conditions *
    diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala index 5cca85cf727..47b6ce93da3 100755 --- a/core/src/main/scala/kafka/server/OffsetManager.scala +++ b/core/src/main/scala/kafka/server/OffsetManager.scala @@ -17,6 +17,7 @@ package kafka.server +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.{Struct, Schema, Field} import org.apache.kafka.common.protocol.types.Type.STRING import org.apache.kafka.common.protocol.types.Type.INT32 @@ -25,19 +26,19 @@ import org.apache.kafka.common.utils.Utils import kafka.utils._ import kafka.common._ -import kafka.log.{FileMessageSet, LogConfig} +import kafka.log.FileMessageSet import kafka.message._ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import kafka.tools.MessageFormatter import kafka.api.ProducerResponseStatus +import kafka.coordinator.ConsumerCoordinator import scala.Some import scala.collection._ import java.io.PrintStream import java.util.concurrent.atomic.AtomicBoolean import java.nio.ByteBuffer -import java.util.Properties import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge @@ -87,8 +88,7 @@ object OffsetManagerConfig { class OffsetManager(val config: OffsetManagerConfig, replicaManager: ReplicaManager, zkClient: ZkClient, - scheduler: Scheduler, - metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup { + scheduler: Scheduler) extends Logging with KafkaMetricsGroup { /* offsets and metadata cache */ private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] @@ -143,9 +143,9 @@ class OffsetManager(val config: OffsetManagerConfig, // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) => - val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition) + val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) partitionOpt.map { partition => - val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) + val appendPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) val messages = tombstones.map(_._2).toSeq trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) @@ -170,14 +170,6 @@ class OffsetManager(val config: OffsetManagerConfig, } - def offsetsTopicConfig: Properties = { - val props = new Properties - props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CleanupPolicyProp, "compact") - props.put(LogConfig.CompressionTypeProp, "uncompressed") - props - } - def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions /** @@ -214,22 +206,14 @@ class OffsetManager(val config: OffsetManagerConfig, /** * Store offsets by appending it to the replicated log and then inserting to cache */ - // TODO: generation id and consumer id is needed by coordinator to do consumer checking in the future def storeOffsets(groupId: String, consumerId: String, generationId: Int, offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { - // check if there are any non-existent topics - val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) => - !metadataCache.contains(topicAndPartition.topic) - } - - // first filter out partitions with offset metadata size exceeding limit or - // if its a non existing topic - // TODO: in the future we may want to only support atomic commit and hence fail the whole commit + // first filter out partitions with offset metadata size exceeding limit val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => - validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition) + validateOffsetMetadataLength(offsetAndMetadata.metadata) } // construct the message set to append @@ -240,7 +224,7 @@ class OffsetManager(val config: OffsetManagerConfig, ) }.toSeq - val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupId)) + val offsetTopicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, partitionFor(groupId)) val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) @@ -271,6 +255,10 @@ class OffsetManager(val config: OffsetManagerConfig, ErrorMapping.ConsumerCoordinatorNotAvailableCode else if (status.error == ErrorMapping.NotLeaderForPartitionCode) ErrorMapping.NotCoordinatorForConsumerCode + else if (status.error == ErrorMapping.MessageSizeTooLargeCode + || status.error == ErrorMapping.MessageSetSizeTooLargeCode + || status.error == ErrorMapping.InvalidFetchSizeCode) + Errors.INVALID_COMMIT_OFFSET_SIZE.code else status.error } @@ -278,9 +266,7 @@ class OffsetManager(val config: OffsetManagerConfig, // compute the final error codes for the commit response val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - if (nonExistentTopics.contains(topicAndPartition)) - (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode) - else if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) + if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicAndPartition, responseCode) else (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) @@ -338,7 +324,7 @@ class OffsetManager(val config: OffsetManagerConfig, debug("Could not fetch offsets for group %s (not offset coordinator).".format(group)) topicPartitions.map { topicAndPartition => val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) - (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotOffsetManagerForGroup) + (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup) }.toMap } } @@ -349,7 +335,7 @@ class OffsetManager(val config: OffsetManagerConfig, */ def loadOffsetsFromLog(offsetsPartition: Int) { - val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition) + val topicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition) loadingPartitions synchronized { if (loadingPartitions.contains(offsetsPartition)) { @@ -421,7 +407,7 @@ class OffsetManager(val config: OffsetManagerConfig, } private def getHighWatermark(partitionId: Int): Long = { - val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId) + val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, partitionId) val hw = partitionOpt.map { partition => partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L) @@ -449,7 +435,7 @@ class OffsetManager(val config: OffsetManagerConfig, } if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition." - .format(numRemoved, TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition))) + .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition))) } @@ -461,8 +447,6 @@ class OffsetManager(val config: OffsetManagerConfig, object OffsetManager { - val OffsetsTopicName = "__consumer_offsets" - private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema) private val CURRENT_OFFSET_SCHEMA_VERSION = 1.toShort diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 59c9bc3ac3a..795220e7f63 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -23,19 +23,19 @@ import kafka.cluster.{BrokerEndPoint, Partition, Replica} import kafka.log.{LogAppendInfo, LogManager} import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController -import kafka.common.TopicAndPartition import kafka.message.{ByteBufferMessageSet, MessageSet} +import kafka.api.ProducerResponseStatus +import kafka.common.TopicAndPartition +import kafka.api.PartitionFetchInfo + +import org.apache.kafka.common.protocol.Errors import java.util.concurrent.atomic.AtomicBoolean import java.io.{IOException, File} import java.util.concurrent.TimeUnit -import org.apache.kafka.common.protocol.Errors -import scala.Predef._ +import scala.Some import scala.collection._ -import scala.collection.mutable.HashMap -import scala.collection.Map -import scala.collection.Set import org.I0Itec.zkclient.ZkClient import com.yammer.metrics.core.Gauge @@ -84,6 +84,17 @@ object LogReadResult { false) } +case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short], + updatedLeaders: Set[Partition], + updatedFollowers: Set[Partition], + errorCode: Short) { + + override def toString = { + "updated leaders: [%s], updated followers: [%s], update results: [%s], global error: [%d]" + .format(updatedLeaders, updatedFollowers, responseMap, errorCode) + } +} + object ReplicaManager { val HighWatermarkFilename = "replication-offset-checkpoint" } @@ -393,10 +404,10 @@ class ReplicaManager(val config: KafkaConfig, (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe))) case nle: NotLeaderForPartitionException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle))) - case mtl: MessageSizeTooLargeException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl))) - case mstl: MessageSetSizeTooLargeException => - (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl))) + case mtle: MessageSizeTooLargeException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle))) + case mstle: MessageSetSizeTooLargeException => + (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle))) case imse : InvalidMessageSizeException => (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse))) case t: Throwable => @@ -416,7 +427,7 @@ class ReplicaManager(val config: KafkaConfig, def fetchMessages(timeout: Long, replicaId: Int, fetchMinBytes: Int, - fetchInfo: Map[TopicAndPartition, PartitionFetchInfo], + fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo], responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { val isFromFollower = replicaId >= 0 @@ -544,30 +555,29 @@ class ReplicaManager(val config: KafkaConfig, } } - def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest, - offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = { + def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): BecomeLeaderOrFollowerResult = { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]" .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition)) } replicaStateChangeLock synchronized { - val responseMap = new collection.mutable.HashMap[(String, Int), Short] - if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { + val responseMap = new mutable.HashMap[(String, Int), Short] + if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) => stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " + "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId, leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch)) } - (responseMap, ErrorMapping.StaleControllerEpochCode) + BecomeLeaderOrFollowerResult(responseMap, Set.empty[Partition], Set.empty[Partition], ErrorMapping.StaleControllerEpochCode) } else { val controllerId = leaderAndISRRequest.controllerId val correlationId = leaderAndISRRequest.correlationId controllerEpoch = leaderAndISRRequest.controllerEpoch // First check partition's leader epoch - val partitionState = new HashMap[Partition, PartitionStateInfo]() - leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) => + val partitionState = new mutable.HashMap[Partition, PartitionStateInfo]() + leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partitionId), partitionStateInfo) => val partition = getOrCreatePartition(topic, partitionId) val partitionLeaderEpoch = partition.getLeaderEpoch() // If the leader epoch is valid record the epoch of the controller that made the leadership decision. @@ -591,14 +601,19 @@ class ReplicaManager(val config: KafkaConfig, } } - val partitionsTobeLeader = partitionState - .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId} + val partitionsTobeLeader = partitionState.filter { case (partition, partitionStateInfo) => + partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId + } val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys) - if (!partitionsTobeLeader.isEmpty) - makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager) - if (!partitionsToBeFollower.isEmpty) - makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager) + val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty) + makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap) + else + Set.empty[Partition] + val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty) + makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap) + else + Set.empty[Partition] // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions // have been completely populated before starting the checkpointing there by avoiding weird race conditions @@ -607,7 +622,7 @@ class ReplicaManager(val config: KafkaConfig, hwThreadInitialized = true } replicaFetcherManager.shutdownIdleFetcherThreads() - (responseMap, ErrorMapping.NoError) + BecomeLeaderOrFollowerResult(responseMap, partitionsBecomeLeader, partitionsBecomeFollower, ErrorMapping.NoError) } } } @@ -623,10 +638,11 @@ class ReplicaManager(val config: KafkaConfig, * the error message will be set on each partition since we do not know which partition caused it * TODO: the above may need to be fixed later */ - private def makeLeaders(controllerId: Int, epoch: Int, + private def makeLeaders(controllerId: Int, + epoch: Int, partitionState: Map[Partition, PartitionStateInfo], - correlationId: Int, responseMap: mutable.Map[(String, Int), Short], - offsetManager: OffsetManager) = { + correlationId: Int, + responseMap: mutable.Map[(String, Int), Short]): Set[Partition] = { partitionState.foreach(state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-leader transition for partition %s") @@ -645,7 +661,7 @@ class ReplicaManager(val config: KafkaConfig, } // Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => - partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)} + partition.makeLeader(controllerId, partitionStateInfo, correlationId)} } catch { case e: Throwable => @@ -664,6 +680,8 @@ class ReplicaManager(val config: KafkaConfig, "for the become-leader transition for partition %s") .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) } + + partitionState.keySet } /* @@ -682,9 +700,12 @@ class ReplicaManager(val config: KafkaConfig, * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where * the error message will be set on each partition since we do not know which partition caused it */ - private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo], - leaders: Set[BrokerEndPoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short], - offsetManager: OffsetManager) { + private def makeFollowers(controllerId: Int, + epoch: Int, + partitionState: Map[Partition, PartitionStateInfo], + leaders: Set[BrokerEndPoint], + correlationId: Int, + responseMap: mutable.Map[(String, Int), Short]) : Set[Partition] = { partitionState.foreach { state => stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition %s") @@ -694,18 +715,18 @@ class ReplicaManager(val config: KafkaConfig, for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError) + val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() + try { - var partitionsToMakeFollower: Set[Partition] = Set() - - // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 + // TODO: Delete leaders from LeaderAndIsrRequest partitionState.foreach{ case (partition, partitionStateInfo) => val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader leaders.find(_.id == newLeaderBrokerId) match { // Only change partition state when the leader is available case Some(leaderBroker) => - if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager)) + if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) partitionsToMakeFollower += partition else stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " + @@ -775,6 +796,8 @@ class ReplicaManager(val config: KafkaConfig, "for the become-follower transition for partition %s") .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))) } + + partitionsToMakeFollower } private def maybeShrinkIsr(): Unit = { diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index 17b17b9b152..92ffb91b5e0 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -25,12 +25,13 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.consumer.NoOffsetForPartitionException import kafka.utils.{TestUtils, Logging} -import kafka.server.{KafkaConfig, OffsetManager} +import kafka.server.KafkaConfig import java.util.ArrayList import org.junit.Assert._ import scala.collection.JavaConversions._ +import kafka.coordinator.ConsumerCoordinator /** @@ -158,9 +159,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging { consumer0.poll(50) // get metadata for the topic - var parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName) + var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName) while(parts == null) - parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName) + parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName) assertEquals(1, parts.size) assertNotNull(parts(0).leader()) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 07b1ff47bfc..afcc349342d 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer import kafka.server.{OffsetManager, KafkaConfig} import kafka.integration.KafkaServerTestHarness import scala.collection.mutable.Buffer +import kafka.coordinator.ConsumerCoordinator /** * A helper class for writing integration tests that involve producers, consumers, and servers @@ -63,11 +64,11 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { consumers += new KafkaConsumer(consumerConfig) // create the consumer offset topic - TestUtils.createTopic(zkClient, OffsetManager.OffsetsTopicName, - serverConfig.getProperty("offsets.topic.num.partitions").toInt, - serverConfig.getProperty("offsets.topic.replication.factor").toInt, + TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName, + serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt, + serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, servers, - servers(0).offsetManager.offsetsTopicConfig) + servers(0).consumerCoordinator.offsetsTopicConfigs) } override def tearDown() { diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index c7136f20972..dcd69881445 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -22,9 +22,9 @@ import org.scalatest.junit.JUnit3Suite import kafka.utils.Logging import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness -import kafka.server.{OffsetManager, KafkaConfig} import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils +import kafka.coordinator.ConsumerCoordinator class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging { @@ -87,12 +87,12 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin // create the offset topic val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", - "--topic", OffsetManager.OffsetsTopicName)) + "--topic", ConsumerCoordinator.OffsetsTopicName)) TopicCommand.createTopic(zkClient, createOffsetTopicOpts) // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't - val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", OffsetManager.OffsetsTopicName)) - val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(OffsetManager.OffsetsTopicName) + val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName)) + val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName) assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath)) intercept[AdminOperationException] { TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts) diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 4f124af5c3e..4b326d090c9 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -22,6 +22,7 @@ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.Test import kafka.server.OffsetManager +import kafka.coordinator.ConsumerCoordinator class TopicFilterTest extends JUnitSuite { @@ -37,8 +38,8 @@ class TopicFilterTest extends JUnitSuite { val topicFilter2 = new Whitelist(".+") assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true)) - assertFalse(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true)) + assertTrue(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false)) val topicFilter3 = new Whitelist("white_listed-topic.+") assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true)) @@ -57,8 +58,8 @@ class TopicFilterTest extends JUnitSuite { assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true)) assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false)) - assertFalse(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true)) + assertTrue(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false)) } @Test diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala index a44fbd653b5..3cd726d291d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala @@ -22,8 +22,8 @@ import java.util.concurrent.TimeUnit import junit.framework.Assert._ import kafka.common.TopicAndPartition -import kafka.server.{KafkaConfig, OffsetManager} -import kafka.utils.TestUtils +import kafka.server.{OffsetManager, ReplicaManager, KafkaConfig} +import kafka.utils.{KafkaScheduler, TestUtils} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.JoinGroupRequest import org.easymock.EasyMock @@ -45,8 +45,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite { val ConsumerMinSessionTimeout = 10 val ConsumerMaxSessionTimeout = 30 val DefaultSessionTimeout = 20 - var offsetManager: OffsetManager = null var consumerCoordinator: ConsumerCoordinator = null + var offsetManager : OffsetManager = null @Before def setUp() { @@ -54,12 +54,13 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite { props.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString) props.setProperty(KafkaConfig.ConsumerMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString) offsetManager = EasyMock.createStrictMock(classOf[OffsetManager]) - consumerCoordinator = new ConsumerCoordinator(KafkaConfig.fromProps(props), null, offsetManager) + consumerCoordinator = ConsumerCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager) consumerCoordinator.startup() } @After def tearDown() { + EasyMock.reset(offsetManager) consumerCoordinator.shutdown() } diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala index 08854c5e6ec..2cbf6e251ad 100644 --- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala @@ -40,7 +40,7 @@ class CoordinatorMetadataTest extends JUnitSuite { def setUp() { val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") zkClient = EasyMock.createStrictMock(classOf[ZkClient]) - coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props), zkClient, null) + coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkClient, null) } @Test diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index 528525b719e..39a68526c8b 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -120,7 +120,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition)) val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2) - assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) + assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get) assertEquals(1, fetchResponse2.requestInfo.size) } @@ -166,14 +166,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness { assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error) - assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error) - assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error) + assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get) assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error) assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get) - assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error) - assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get) + assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error) + assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get) assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata) assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)