KAFKA-4682; Revise expiration semantics of consumer group offsets (KIP-211 - Part 1) (#4896)

This patch contains the improved offset expiration semantics proposed in KIP-211. Committed offsets will not be expired as long as a group is active. Once all members have left the group, then offsets will be expired after the timeout configured by `offsets.retention.minutes`. Note that the optimization for early expiration of unsubscribed topics will be implemented in a separate patch.
This commit is contained in:
Vahid Hashemian 2018-06-21 17:19:24 -07:00 committed by Jason Gustafson
parent e33ccb628e
commit 418a91b5d4
18 changed files with 545 additions and 159 deletions

View File

@ -798,8 +798,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData). OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
setGenerationId(generation.generationId). setGenerationId(generation.generationId).
setMemberId(generation.memberId). setMemberId(generation.memberId);
setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator); log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);

View File

@ -111,9 +111,15 @@ public class OffsetCommitRequest extends AbstractRequest {
*/ */
private static final Schema OFFSET_COMMIT_REQUEST_V4 = OFFSET_COMMIT_REQUEST_V3; private static final Schema OFFSET_COMMIT_REQUEST_V4 = OFFSET_COMMIT_REQUEST_V3;
private static final Schema OFFSET_COMMIT_REQUEST_V5 = new Schema(
GROUP_ID,
GENERATION_ID,
MEMBER_ID,
new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets."));
public static Schema[] schemaVersions() { public static Schema[] schemaVersions() {
return new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, return new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2,
OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4}; OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4, OFFSET_COMMIT_REQUEST_V5};
} }
// default values for the current version // default values for the current version
@ -166,7 +172,6 @@ public class OffsetCommitRequest extends AbstractRequest {
private final Map<TopicPartition, PartitionData> offsetData; private final Map<TopicPartition, PartitionData> offsetData;
private String memberId = DEFAULT_MEMBER_ID; private String memberId = DEFAULT_MEMBER_ID;
private int generationId = DEFAULT_GENERATION_ID; private int generationId = DEFAULT_GENERATION_ID;
private long retentionTime = DEFAULT_RETENTION_TIME;
public Builder(String groupId, Map<TopicPartition, PartitionData> offsetData) { public Builder(String groupId, Map<TopicPartition, PartitionData> offsetData) {
super(ApiKeys.OFFSET_COMMIT); super(ApiKeys.OFFSET_COMMIT);
@ -184,11 +189,6 @@ public class OffsetCommitRequest extends AbstractRequest {
return this; return this;
} }
public Builder setRetentionTime(long retentionTime) {
this.retentionTime = retentionTime;
return this;
}
@Override @Override
public OffsetCommitRequest build(short version) { public OffsetCommitRequest build(short version) {
switch (version) { switch (version) {
@ -199,8 +199,8 @@ public class OffsetCommitRequest extends AbstractRequest {
case 2: case 2:
case 3: case 3:
case 4: case 4:
long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME : this.retentionTime; case 5:
return new OffsetCommitRequest(groupId, generationId, memberId, retentionTime, offsetData, version); return new OffsetCommitRequest(groupId, generationId, memberId, DEFAULT_RETENTION_TIME, offsetData, version);
default: default:
throw new UnsupportedVersionException("Unsupported version " + version); throw new UnsupportedVersionException("Unsupported version " + version);
} }
@ -213,7 +213,6 @@ public class OffsetCommitRequest extends AbstractRequest {
append(", groupId=").append(groupId). append(", groupId=").append(groupId).
append(", memberId=").append(memberId). append(", memberId=").append(memberId).
append(", generationId=").append(generationId). append(", generationId=").append(generationId).
append(", retentionTime=").append(retentionTime).
append(", offsetData=").append(offsetData). append(", offsetData=").append(offsetData).
append(")"); append(")");
return bld.toString(); return bld.toString();
@ -316,6 +315,7 @@ public class OffsetCommitRequest extends AbstractRequest {
return new OffsetCommitResponse(responseData); return new OffsetCommitResponse(responseData);
case 3: case 3:
case 4: case 4:
case 5:
return new OffsetCommitResponse(throttleTimeMs, responseData); return new OffsetCommitResponse(throttleTimeMs, responseData);
default: default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",

View File

@ -85,9 +85,11 @@ public class OffsetCommitResponse extends AbstractResponse {
*/ */
private static final Schema OFFSET_COMMIT_RESPONSE_V4 = OFFSET_COMMIT_RESPONSE_V3; private static final Schema OFFSET_COMMIT_RESPONSE_V4 = OFFSET_COMMIT_RESPONSE_V3;
private static final Schema OFFSET_COMMIT_RESPONSE_V5 = OFFSET_COMMIT_RESPONSE_V4;
public static Schema[] schemaVersions() { public static Schema[] schemaVersions() {
return new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, return new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2,
OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4}; OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4, OFFSET_COMMIT_RESPONSE_V5};
} }
private final Map<TopicPartition, Errors> responseData; private final Map<TopicPartition, Errors> responseData;

View File

@ -141,9 +141,6 @@ public class RequestResponseTest {
checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException()); checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException());
checkResponse(createMetadataResponse(), 4); checkResponse(createMetadataResponse(), 4);
checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException()); checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException());
checkRequest(createOffsetCommitRequest(2));
checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
checkResponse(createOffsetCommitResponse(), 0);
checkRequest(OffsetFetchRequest.forAllPartitions("group1")); checkRequest(OffsetFetchRequest.forAllPartitions("group1"));
checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorException("Not Coordinator")); checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorException("Not Coordinator"));
checkRequest(createOffsetFetchRequest(0)); checkRequest(createOffsetFetchRequest(0));
@ -210,6 +207,16 @@ public class RequestResponseTest {
checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException()); checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException());
checkRequest(createOffsetCommitRequest(1)); checkRequest(createOffsetCommitRequest(1));
checkErrorResponse(createOffsetCommitRequest(1), new UnknownServerException()); checkErrorResponse(createOffsetCommitRequest(1), new UnknownServerException());
checkRequest(createOffsetCommitRequest(2));
checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
checkRequest(createOffsetCommitRequest(3));
checkErrorResponse(createOffsetCommitRequest(3), new UnknownServerException());
checkRequest(createOffsetCommitRequest(4));
checkErrorResponse(createOffsetCommitRequest(4), new UnknownServerException());
checkResponse(createOffsetCommitResponse(), 4);
checkRequest(createOffsetCommitRequest(5));
checkErrorResponse(createOffsetCommitRequest(5), new UnknownServerException());
checkResponse(createOffsetCommitResponse(), 5);
checkRequest(createJoinGroupRequest(0)); checkRequest(createJoinGroupRequest(0));
checkRequest(createUpdateMetadataRequest(0, null)); checkRequest(createUpdateMetadataRequest(0, null));
checkErrorResponse(createUpdateMetadataRequest(0, null), new UnknownServerException()); checkErrorResponse(createUpdateMetadataRequest(0, null), new UnknownServerException());
@ -817,7 +824,6 @@ public class RequestResponseTest {
return new OffsetCommitRequest.Builder("group1", commitData) return new OffsetCommitRequest.Builder("group1", commitData)
.setGenerationId(100) .setGenerationId(100)
.setMemberId("consumer1") .setMemberId("consumer1")
.setRetentionTime(1000000)
.build((short) version); .build((short) version);
} }

View File

@ -73,7 +73,9 @@ object ApiVersion {
// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
KAFKA_2_0_IV0, KAFKA_2_0_IV0,
// Introduced ApiVersionsRequest V2 via KIP-219 // Introduced ApiVersionsRequest V2 via KIP-219
KAFKA_2_0_IV1 KAFKA_2_0_IV1,
// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211)
KAFKA_2_1_IV0
) )
// Map keys are the union of the short and full versions // Map keys are the union of the short and full versions
@ -248,4 +250,11 @@ case object KAFKA_2_0_IV1 extends DefaultApiVersion {
val subVersion = "IV1" val subVersion = "IV1"
val recordVersion = RecordVersion.V2 val recordVersion = RecordVersion.V2
val id: Int = 16 val id: Int = 16
}
case object KAFKA_2_1_IV0 extends DefaultApiVersion {
val shortVersion: String = "2.1"
val subVersion = "IV0"
val recordVersion = RecordVersion.V2
val id: Int = 18
} }

View File

@ -34,17 +34,17 @@ object OffsetMetadata {
case class OffsetAndMetadata(offsetMetadata: OffsetMetadata, case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP, commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) { expireTimestamp: Option[Long] = None) {
def offset = offsetMetadata.offset def offset = offsetMetadata.offset
def metadata = offsetMetadata.metadata def metadata = offsetMetadata.metadata
override def toString = "[%s,CommitTime %d,ExpirationTime %d]".format(offsetMetadata, commitTimestamp, expireTimestamp) override def toString = s"[$offsetMetadata,CommitTime $commitTimestamp,ExpirationTime ${expireTimestamp.getOrElse("_")}]"
} }
object OffsetAndMetadata { object OffsetAndMetadata {
def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, expireTimestamp) def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, Some(expireTimestamp))
def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp) def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp)

View File

@ -125,7 +125,7 @@ class GroupCoordinator(val brokerId: Int,
if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID)) responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
} else { } else {
val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty)) val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} }
@ -451,7 +451,7 @@ class GroupCoordinator(val brokerId: Int,
case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
case None => case None =>
val group = groupManager.getGroup(groupId).getOrElse { val group = groupManager.getGroup(groupId).getOrElse {
groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty)) groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
} }
doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback) doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback)
} }
@ -469,7 +469,7 @@ class GroupCoordinator(val brokerId: Int,
case None => case None =>
if (generationId < 0) { if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit // the group is not relying on Kafka for group management, so allow the commit
val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty)) val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH, doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
offsetMetadata, responseCallback) offsetMetadata, responseCallback)
} else { } else {

View File

@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import kafka.utils.{CoreUtils, Logging, nonthreadsafe} import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
import scala.collection.{Seq, immutable, mutable} import scala.collection.{Seq, immutable, mutable}
@ -118,12 +119,15 @@ private object GroupMetadata {
protocolType: String, protocolType: String,
protocol: String, protocol: String,
leaderId: String, leaderId: String,
members: Iterable[MemberMetadata]): GroupMetadata = { currentStateTimestamp: Option[Long],
val group = new GroupMetadata(groupId, initialState) members: Iterable[MemberMetadata],
time: Time): GroupMetadata = {
val group = new GroupMetadata(groupId, initialState, time)
group.generationId = generationId group.generationId = generationId
group.protocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType) group.protocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
group.protocol = Option(protocol) group.protocol = Option(protocol)
group.leaderId = Option(leaderId) group.leaderId = Option(leaderId)
group.currentStateTimestamp = currentStateTimestamp
members.foreach(group.add) members.foreach(group.add)
group group
} }
@ -167,10 +171,11 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs
* 3. leader id * 3. leader id
*/ */
@nonthreadsafe @nonthreadsafe
private[group] class GroupMetadata(val groupId: String, initialState: GroupState) extends Logging { private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) extends Logging {
private[group] val lock = new ReentrantLock private[group] val lock = new ReentrantLock
private var state: GroupState = initialState private var state: GroupState = initialState
var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
var protocolType: Option[String] = None var protocolType: Option[String] = None
var generationId = 0 var generationId = 0
private var leaderId: Option[String] = None private var leaderId: Option[String] = None
@ -195,6 +200,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def isLeader(memberId: String): Boolean = leaderId.contains(memberId) def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
def leaderOrNull: String = leaderId.orNull def leaderOrNull: String = leaderId.orNull
def protocolOrNull: String = protocol.orNull def protocolOrNull: String = protocol.orNull
def currentStateTimestampOrDefault: Long = currentStateTimestamp.getOrElse(-1)
def add(member: MemberMetadata) { def add(member: MemberMetadata) {
if (members.isEmpty) if (members.isEmpty)
@ -240,6 +246,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
def transitionTo(groupState: GroupState) { def transitionTo(groupState: GroupState) {
assertValidTransition(groupState) assertValidTransition(groupState)
state = groupState state = groupState
currentStateTimestamp = Some(time.milliseconds())
} }
def selectProtocol: String = { def selectProtocol: String = {
@ -434,18 +441,51 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
}.toMap }.toMap
} }
def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = { def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
val expiredOffsets = offsets
.filter { def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = {
offsets.filter {
case (topicPartition, commitRecordMetadataAndOffset) => case (topicPartition, commitRecordMetadataAndOffset) =>
commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition) !pendingOffsetCommits.contains(topicPartition) && {
} commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
.map { case None =>
// current version with no per partition retention
currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
case Some(expireTimestamp) =>
// older versions with explicit expire_timestamp field => old expiration semantics is used
currentTimestamp >= expireTimestamp
}
}
}.map {
case (topicPartition, commitRecordOffsetAndMetadata) => case (topicPartition, commitRecordOffsetAndMetadata) =>
(topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata) (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
} }.toMap
}
val expiredOffsets: Map[TopicPartition, OffsetAndMetadata] = protocolType match {
case Some(_) if is(Empty) =>
// no consumer exists in the group =>
// - if current state timestamp exists and retention period has passed since group became Empty,
// expire all offsets with no pending offset commit;
// - if there is no current state timestamp (old group metadata schema) and retention period has passed
// since the last commit timestamp, expire the offset
getExpiredOffsets(commitRecordMetadataAndOffset =>
currentStateTimestamp.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp))
case None =>
// protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only
// expire offsets with no pending offset commit that retention period has passed since their last commit
getExpiredOffsets(_.offsetAndMetadata.commitTimestamp)
case _ =>
Map()
}
if (expiredOffsets.nonEmpty)
debug(s"Expired offsets from group '$groupId': ${expiredOffsets.keySet}")
offsets --= expiredOffsets.keySet offsets --= expiredOffsets.keySet
expiredOffsets.toMap expiredOffsets
} }
def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) => def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>

View File

@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0} import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0}
import kafka.common.{MessageFormatter, OffsetAndMetadata} import kafka.common.{MessageFormatter, OffsetAndMetadata}
import kafka.metrics.KafkaMetricsGroup import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager import kafka.server.ReplicaManager
@ -197,18 +197,11 @@ class GroupMetadataManager(brokerId: Int,
responseCallback: Errors => Unit): Unit = { responseCallback: Errors => Unit): Unit = {
getMagic(partitionFor(group.groupId)) match { getMagic(partitionFor(group.groupId)) match {
case Some(magicValue) => case Some(magicValue) =>
val groupMetadataValueVersion = {
if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
0.toShort
else
GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
}
// We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
val timestampType = TimestampType.CREATE_TIME val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds() val timestamp = time.milliseconds()
val key = GroupMetadataManager.groupMetadataKey(group.groupId) val key = GroupMetadataManager.groupMetadataKey(group.groupId)
val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion) val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
val records = { val records = {
val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
@ -330,7 +323,7 @@ class GroupMetadataManager(brokerId: Int,
val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) => val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition) val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata) val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
new SimpleRecord(timestamp, key, value) new SimpleRecord(timestamp, key, value)
} }
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
@ -580,7 +573,7 @@ class GroupMetadataManager(brokerId: Int,
case groupMetadataKey: GroupMetadataKey => case groupMetadataKey: GroupMetadataKey =>
// load group metadata // load group metadata
val groupId = groupMetadataKey.key val groupId = groupMetadataKey.key
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
if (groupMetadata != null) { if (groupMetadata != null) {
removedGroups.remove(groupId) removedGroups.remove(groupId)
loadedGroups.put(groupId, groupMetadata) loadedGroups.put(groupId, groupMetadata)
@ -630,7 +623,7 @@ class GroupMetadataManager(brokerId: Int,
// load groups which store offsets in kafka, but which have no active members and thus no group // load groups which store offsets in kafka, but which have no active members and thus no group
// metadata stored in the log // metadata stored in the log
(emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId => (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId =>
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets") debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
@ -653,18 +646,8 @@ class GroupMetadataManager(brokerId: Int,
pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = { pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = {
// offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent // offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent
// view of the group's offsets // view of the group's offsets
val loadedOffsets = offsets.mapValues { case CommitRecordMetadataAndOffset(commitRecordOffset, offsetAndMetadata) => trace(s"Initialized offsets $offsets for group ${group.groupId}")
// special handling for version 0: group.initializeOffsets(offsets, pendingTransactionalOffsets.toMap)
// set the expiration time stamp as commit time stamp + server default retention time
val updatedOffsetAndMetadata =
if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
offsetAndMetadata.copy(expireTimestamp = offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs)
else
offsetAndMetadata
CommitRecordMetadataAndOffset(commitRecordOffset, updatedOffsetAndMetadata)
}
trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}")
group.initializeOffsets(loadedOffsets, pendingTransactionalOffsets.toMap)
val currentGroup = addGroup(group) val currentGroup = addGroup(group)
if (group != currentGroup) if (group != currentGroup)
@ -711,11 +694,11 @@ class GroupMetadataManager(brokerId: Int,
// visible for testing // visible for testing
private[group] def cleanupGroupMetadata(): Unit = { private[group] def cleanupGroupMetadata(): Unit = {
val startMs = time.milliseconds() val currentTimestamp = time.milliseconds()
val offsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => { val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
group.removeExpiredOffsets(time.milliseconds()) group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
}) })
info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.") info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
} }
/** /**
@ -917,7 +900,7 @@ class GroupMetadataManager(brokerId: Int,
* -> value version 1: [offset, metadata, commit_timestamp, expire_timestamp] * -> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]
* *
* key version 2: group metadata * key version 2: group metadata
* -> value version 0: [protocol_type, generation, protocol, leader, members] * -> value version 0: [protocol_type, generation, protocol, leader, members]
*/ */
object GroupMetadataManager { object GroupMetadataManager {
@ -947,6 +930,13 @@ object GroupMetadataManager {
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("commit_timestamp", INT64))
private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
@ -975,10 +965,13 @@ object GroupMetadataManager {
new Field(SUBSCRIPTION_KEY, BYTES), new Field(SUBSCRIPTION_KEY, BYTES),
new Field(ASSIGNMENT_KEY, BYTES)) new Field(ASSIGNMENT_KEY, BYTES))
private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
private val PROTOCOL_TYPE_KEY = "protocol_type" private val PROTOCOL_TYPE_KEY = "protocol_type"
private val GENERATION_KEY = "generation" private val GENERATION_KEY = "generation"
private val PROTOCOL_KEY = "protocol" private val PROTOCOL_KEY = "protocol"
private val LEADER_KEY = "leader" private val LEADER_KEY = "leader"
private val CURRENT_STATE_TIMESTAMP_KEY = "current_state_timestamp"
private val MEMBERS_KEY = "members" private val MEMBERS_KEY = "members"
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema( private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
@ -995,6 +988,14 @@ object GroupMetadataManager {
new Field(LEADER_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1))) new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
// map of versions to key schemas as data types // map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map( private val MESSAGE_TYPE_SCHEMAS = Map(
@ -1005,19 +1006,20 @@ object GroupMetadataManager {
// map of version of offset value schemas // map of version of offset value schemas
private val OFFSET_VALUE_SCHEMAS = Map( private val OFFSET_VALUE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0, 0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1) 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort 2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2)
// map of version of group metadata value schemas // map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map( private val GROUP_VALUE_SCHEMAS = Map(
0 -> GROUP_METADATA_VALUE_SCHEMA_V0, 0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
1 -> GROUP_METADATA_VALUE_SCHEMA_V1) 1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort 2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 2.toShort
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION) private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION) private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION) private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(2)
private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION) private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
private def schemaForKey(version: Int) = { private def schemaForKey(version: Int) = {
@ -1081,17 +1083,34 @@ object GroupMetadataManager {
* Generates the payload for offset commit message from given offset and metadata * Generates the payload for offset commit message from given offset and metadata
* *
* @param offsetAndMetadata consumer's current offset and metadata * @param offsetAndMetadata consumer's current offset and metadata
* @param apiVersion the api version
* @return payload for offset commit message * @return payload for offset commit message
*/ */
private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = { private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
// generate commit value with schema version 1 apiVersion: ApiVersion): Array[Byte] = {
val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA) // generate commit value according to schema version
value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset) val (version, value) = {
value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata) if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp) // if an older version of the API is used, or if an explicit expiration is provided, use the older schema
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp) (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
else
(2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
}
if (version == 2) {
value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
} else {
value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
// version 1 has a non empty expireTimestamp field
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get)
}
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION) byteBuffer.putShort(version)
value.writeTo(byteBuffer) value.writeTo(byteBuffer)
byteBuffer.array() byteBuffer.array()
} }
@ -1102,19 +1121,30 @@ object GroupMetadataManager {
* *
* @param groupMetadata current group metadata * @param groupMetadata current group metadata
* @param assignment the assignment for the rebalancing generation * @param assignment the assignment for the rebalancing generation
* @param version the version of the value message to use * @param apiVersion the api version
* @return payload for offset commit message * @return payload for offset commit message
*/ */
private[group] def groupMetadataValue(groupMetadata: GroupMetadata, private[group] def groupMetadataValue(groupMetadata: GroupMetadata,
assignment: Map[String, Array[Byte]], assignment: Map[String, Array[Byte]],
version: Short = 0): Array[Byte] = { apiVersion: ApiVersion): Array[Byte] = {
val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
val (version, value) = {
if (apiVersion < KAFKA_0_10_1_IV0)
(0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
else if (apiVersion < KAFKA_2_1_IV0)
(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
else
(2.toShort, new Struct(CURRENT_GROUP_VALUE_SCHEMA))
}
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse("")) value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
value.set(GENERATION_KEY, groupMetadata.generationId) value.set(GENERATION_KEY, groupMetadata.generationId)
value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull) value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull)
value.set(LEADER_KEY, groupMetadata.leaderOrNull) value.set(LEADER_KEY, groupMetadata.leaderOrNull)
if (version >= 2)
value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault)
val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata => val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
val memberStruct = value.instance(MEMBERS_KEY) val memberStruct = value.instance(MEMBERS_KEY)
memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId) memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
@ -1174,7 +1204,7 @@ object GroupMetadataManager {
GroupMetadataKey(version, group) GroupMetadataKey(version, group)
} else { } else {
throw new IllegalStateException("Unknown version " + version + " for group metadata message") throw new IllegalStateException(s"Unknown group metadata message version: $version")
} }
} }
@ -1205,8 +1235,14 @@ object GroupMetadataManager {
val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long] val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp) OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
} else if (version == 2) {
val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp)
} else { } else {
throw new IllegalStateException("Unknown offset message version") throw new IllegalStateException(s"Unknown offset message version: $version")
} }
} }
} }
@ -1215,9 +1251,10 @@ object GroupMetadataManager {
* Decodes the group metadata messages' payload and retrieves its member metadata from it * Decodes the group metadata messages' payload and retrieves its member metadata from it
* *
* @param buffer input byte-buffer * @param buffer input byte-buffer
* @param time the time instance to use
* @return a group metadata object from the message * @return a group metadata object from the message
*/ */
def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = { def readGroupMessageValue(groupId: String, buffer: ByteBuffer, time: Time): GroupMetadata = {
if (buffer == null) { // tombstone if (buffer == null) { // tombstone
null null
} else { } else {
@ -1225,13 +1262,23 @@ object GroupMetadataManager {
val valueSchema = schemaForGroup(version) val valueSchema = schemaForGroup(version)
val value = valueSchema.read(buffer) val value = valueSchema.read(buffer)
if (version == 0 || version == 1) { if (version >= 0 && version <= 2) {
val generationId = value.get(GENERATION_KEY).asInstanceOf[Int] val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String] val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String] val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
val leaderId = value.get(LEADER_KEY).asInstanceOf[String] val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
val memberMetadataArray = value.getArray(MEMBERS_KEY) val memberMetadataArray = value.getArray(MEMBERS_KEY)
val initialState = if (memberMetadataArray.isEmpty) Empty else Stable val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
val currentStateTimestamp: Option[Long] = version match {
case version if version == 2 =>
if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
if (timestamp == -1) None else Some(timestamp)
} else
None
case _ =>
None
}
val members = memberMetadataArray.map { memberMetadataObj => val members = memberMetadataArray.map { memberMetadataObj =>
val memberMetadata = memberMetadataObj.asInstanceOf[Struct] val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
@ -1247,9 +1294,9 @@ object GroupMetadataManager {
member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer]) member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
member member
} }
GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, members) GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, currentStateTimestamp, members, time)
} else { } else {
throw new IllegalStateException("Unknown group metadata message version") throw new IllegalStateException(s"Unknown group metadata message version: $version")
} }
} }
} }
@ -1287,7 +1334,7 @@ object GroupMetadataManager {
val value = consumerRecord.value val value = consumerRecord.value
val formattedValue = val formattedValue =
if (value == null) "NULL" if (value == null) "NULL"
else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value), Time.SYSTEM).toString
output.write(groupId.getBytes(StandardCharsets.UTF_8)) output.write(groupId.getBytes(StandardCharsets.UTF_8))
output.write("::".getBytes(StandardCharsets.UTF_8)) output.write("::".getBytes(StandardCharsets.UTF_8))
output.write(formattedValue.getBytes(StandardCharsets.UTF_8)) output.write(formattedValue.getBytes(StandardCharsets.UTF_8))

View File

@ -332,33 +332,25 @@ class KafkaApis(val requestChannel: RequestChannel,
} else { } else {
// for version 1 and beyond store offsets in offset manager // for version 1 and beyond store offsets in offset manager
// compute the retention time based on the request version:
// if it is v1 or not specified by user, we can use the default retention
val offsetRetention =
if (header.apiVersion <= 1 ||
offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
groupCoordinator.offsetConfig.offsetsRetentionMs
else
offsetCommitRequest.retentionTime
// commit timestamp is always set to now. // commit timestamp is always set to now.
// "default" expiration timestamp is now + retention (and retention may be overridden if v2) // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
// expire timestamp is computed differently for v1 and v2. // expire timestamp is computed differently for v1 and v2.
// - If v1 and no explicit commit timestamp is provided we use default expiration timestamp. // - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
// - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp // - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
// - If v2 we use the default expiration timestamp // - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
// - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
val currentTimestamp = time.milliseconds val currentTimestamp = time.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
val partitionData = authorizedTopicRequestInfo.mapValues { partitionData => val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata( new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata), offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
commitTimestamp = currentTimestamp, commitTimestamp = partitionData.timestamp match {
expireTimestamp = { case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP) case customTimestamp => customTimestamp
defaultExpireTimestamp },
else expireTimestamp = offsetCommitRequest.retentionTime match {
offsetRetention + partitionData.timestamp case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
case retentionTime => Some(currentTimestamp + retentionTime)
} }
) )
} }
@ -1912,7 +1904,7 @@ class KafkaApis(val requestChannel: RequestChannel,
topicPartition -> new OffsetAndMetadata( topicPartition -> new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata), offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
commitTimestamp = currentTimestamp, commitTimestamp = currentTimestamp,
expireTimestamp = defaultExpireTimestamp) expireTimestamp = Some(defaultExpireTimestamp))
} }
} }

View File

@ -368,6 +368,10 @@ object ConsoleConsumer extends Logging {
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group) consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group)
case None => case None =>
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}") consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
// By default, avoid unnecessary expansion of the coordinator cache since
// the auto-generated group and its offsets is not intended to be used again
if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
groupIdPassed = false groupIdPassed = false
} }

View File

@ -29,7 +29,7 @@ import kafka.utils._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.KafkaException import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.utils.{Time, Utils}
import scala.collection.{Map, mutable} import scala.collection.{Map, mutable}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
@ -321,7 +321,7 @@ object DumpLogSegments {
private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer) = { private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer) = {
val groupId = groupMetadataKey.key val groupId = groupMetadataKey.key
val group = GroupMetadataManager.readGroupMessageValue(groupId, payload) val group = GroupMetadataManager.readGroupMessageValue(groupId, payload, Time.SYSTEM)
val protocolType = group.protocolType.getOrElse("") val protocolType = group.protocolType.getOrElse("")
val assignment = group.allMemberMetadata.map { member => val assignment = group.allMemberMetadata.map { member =>

View File

@ -334,7 +334,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def createOffsetCommitRequest = { private def createOffsetCommitRequest = {
new requests.OffsetCommitRequest.Builder( new requests.OffsetCommitRequest.Builder(
group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava). group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
setMemberId("").setGenerationId(1).setRetentionTime(1000). setMemberId("").setGenerationId(1).
build() build()
} }

View File

@ -79,6 +79,9 @@ class ApiVersionTest {
assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0")) assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0"))
assertEquals(KAFKA_2_0_IV0, ApiVersion("2.0-IV0")) assertEquals(KAFKA_2_0_IV0, ApiVersion("2.0-IV0"))
assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0-IV1")) assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0-IV1"))
assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1"))
assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1-IV0"))
} }
@Test @Test

View File

@ -17,20 +17,22 @@
package kafka.coordinator.group package kafka.coordinator.group
import kafka.api.ApiVersion import kafka.api.{ApiVersion, KAFKA_1_1_IV0, KAFKA_2_1_IV0}
import kafka.cluster.Partition import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import kafka.log.{Log, LogAppendInfo} import kafka.log.{Log, LogAppendInfo}
import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager} import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
import kafka.utils.TestUtils.fail import kafka.utils.TestUtils.fail
import kafka.utils.{KafkaScheduler, MockTime, TestUtils} import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._ import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse} import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.easymock.{Capture, EasyMock, IAnswer} import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull} import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
import org.junit.{Before, Test} import org.junit.{Before, Test}
import java.nio.ByteBuffer import java.nio.ByteBuffer
@ -52,6 +54,7 @@ class GroupMetadataManagerTest {
var scheduler: KafkaScheduler = null var scheduler: KafkaScheduler = null
var zkClient: KafkaZkClient = null var zkClient: KafkaZkClient = null
var partition: Partition = null var partition: Partition = null
var defaultOffsetRetentionMs = Long.MaxValue
val groupId = "foo" val groupId = "foo"
val groupPartitionId = 0 val groupPartitionId = 0
@ -75,6 +78,8 @@ class GroupMetadataManagerTest {
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs
// make two partitions of the group topic to make sure some partitions are not owned by the coordinator // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
@ -506,7 +511,7 @@ class GroupMetadataManagerTest {
// group is owned but does not exist yet // group is owned but does not exist yet
assertTrue(groupMetadataManager.groupNotExists(groupId)) assertTrue(groupMetadataManager.groupNotExists(groupId))
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
// group is owned but not Dead // group is owned but not Dead
@ -616,6 +621,7 @@ class GroupMetadataManagerTest {
assertEquals(committedOffsets.size, group.allOffsets.size) assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) => committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
} }
} }
@ -729,9 +735,9 @@ class GroupMetadataManagerTest {
@Test @Test
def testAddGroup() { def testAddGroup() {
val group = new GroupMetadata("foo", initialState = Empty) val group = new GroupMetadata("foo", Empty, time)
assertEquals(group, groupMetadataManager.addGroup(group)) assertEquals(group, groupMetadataManager.addGroup(group))
assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", initialState = Empty))) assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", Empty, time)))
} }
@Test @Test
@ -739,7 +745,7 @@ class GroupMetadataManagerTest {
val generation = 27 val generation = 27
val protocolType = "consumer" val protocolType = "consumer"
val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty) val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val capturedRecords = expectAppendMessage(Errors.NONE) val capturedRecords = expectAppendMessage(Errors.NONE)
@ -758,7 +764,7 @@ class GroupMetadataManagerTest {
assertEquals(1, records.size) assertEquals(1, records.size)
val record = records.head val record = records.head
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
assertTrue(groupMetadata.is(Empty)) assertTrue(groupMetadata.is(Empty))
assertEquals(generation, groupMetadata.generationId) assertEquals(generation, groupMetadata.generationId)
assertEquals(Some(protocolType), groupMetadata.protocolType) assertEquals(Some(protocolType), groupMetadata.protocolType)
@ -766,7 +772,7 @@ class GroupMetadataManagerTest {
@Test @Test
def testStoreEmptySimpleGroup() { def testStoreEmptySimpleGroup() {
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val capturedRecords = expectAppendMessage(Errors.NONE) val capturedRecords = expectAppendMessage(Errors.NONE)
@ -787,7 +793,7 @@ class GroupMetadataManagerTest {
assertEquals(1, records.size) assertEquals(1, records.size)
val record = records.head val record = records.head
val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
assertTrue(groupMetadata.is(Empty)) assertTrue(groupMetadata.is(Empty))
assertEquals(0, groupMetadata.generationId) assertEquals(0, groupMetadata.generationId)
assertEquals(None, groupMetadata.protocolType) assertEquals(None, groupMetadata.protocolType)
@ -809,7 +815,7 @@ class GroupMetadataManagerTest {
private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) { private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) {
EasyMock.reset(replicaManager) EasyMock.reset(replicaManager)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
expectAppendMessage(appendError) expectAppendMessage(appendError)
@ -832,7 +838,7 @@ class GroupMetadataManagerTest {
val clientId = "clientId" val clientId = "clientId"
val clientHost = "localhost" val clientHost = "localhost"
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
@ -863,7 +869,7 @@ class GroupMetadataManagerTest {
val clientId = "clientId" val clientId = "clientId"
val clientHost = "localhost" val clientHost = "localhost"
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]()))) protocolType, List(("protocol", Array[Byte]())))
@ -893,7 +899,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@ -935,7 +941,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@ -975,7 +981,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@ -1014,7 +1020,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@ -1052,7 +1058,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@ -1094,7 +1100,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@ -1132,7 +1138,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
// expire the offset after 1 millisecond // expire the offset after 1 millisecond
@ -1185,7 +1191,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
group.generationId = 5 group.generationId = 5
@ -1233,7 +1239,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
group.generationId = 5 group.generationId = 5
@ -1287,7 +1293,7 @@ class GroupMetadataManagerTest {
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
// expire the offset after 1 millisecond // expire the offset after 1 millisecond
@ -1348,31 +1354,39 @@ class GroupMetadataManagerTest {
} }
@Test @Test
def testExpireOffsetsWithActiveGroup() { def testOffsetExpirationSemantics() {
val memberId = "memberId" val memberId = "memberId"
val clientId = "clientId" val clientId = "clientId"
val clientHost = "localhost" val clientHost = "localhost"
val topicPartition1 = new TopicPartition("foo", 0) val topic = "foo"
val topicPartition2 = new TopicPartition("foo", 1) val topicPartition1 = new TopicPartition(topic, 0)
val topicPartition2 = new TopicPartition(topic, 1)
val topicPartition3 = new TopicPartition(topic, 2)
val offset = 37 val offset = 37
groupMetadataManager.addPartitionOwnership(groupPartitionId) groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, initialState = Empty) val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
val subscription = new Subscription(List(topic).asJava)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]()))) protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array())))
member.awaitingJoinCallback = _ => () member.awaitingJoinCallback = _ => ()
group.add(member) group.add(member)
group.transitionTo(PreparingRebalance) group.transitionTo(PreparingRebalance)
group.initNextGeneration() group.initNextGeneration()
// expire the offset after 1 millisecond
val startMs = time.milliseconds val startMs = time.milliseconds
// old clients, expiry timestamp is explicitly set
val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 1)
val tp2OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 3)
// new clients, no per-partition expiry timestamp, offsets of group expire together
val tp3OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val offsets = immutable.Map( val offsets = immutable.Map(
topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition1 -> tp1OffsetAndMetadata,
topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) topicPartition2 -> tp2OffsetAndMetadata,
topicPartition3 -> tp3OffsetAndMetadata)
mockGetPartition() mockGetPartition()
expectAppendMessage(Errors.NONE) expectAppendMessage(Errors.NONE)
@ -1389,8 +1403,26 @@ class GroupMetadataManagerTest {
assertFalse(commitErrors.isEmpty) assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1)) assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
// expire all of the offsets // do not expire any offset even though expiration timestamp is reached for one (due to group still being active)
time.sleep(4) time.sleep(2)
groupMetadataManager.cleanupGroupMetadata()
// group and offsets should still be there
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
var cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
assertEquals(Some(offset), cachedOffsets.get(topicPartition1).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
EasyMock.verify(replicaManager)
group.transitionTo(PreparingRebalance)
group.transitionTo(Empty)
// expect the offset tombstone // expect the offset tombstone
EasyMock.reset(partition) EasyMock.reset(partition)
@ -1401,16 +1433,245 @@ class GroupMetadataManagerTest {
groupMetadataManager.cleanupGroupMetadata() groupMetadataManager.cleanupGroupMetadata()
// group should still be there, but the offsets should be gone // group is empty now, only one offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicPartition1))
assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
EasyMock.verify(replicaManager)
time.sleep(2)
// expect the offset tombstone
EasyMock.reset(partition)
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
.andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
// one more offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId)) assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicPartition1)) assertEquals(None, group.offset(topicPartition1))
assertEquals(None, group.offset(topicPartition2)) assertEquals(None, group.offset(topicPartition2))
assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2))) cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset)) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
EasyMock.verify(replicaManager) EasyMock.verify(replicaManager)
// advance time to just before the offset of last partition is to be expired, no offset should expire
time.sleep(group.currentStateTimestamp.get + defaultOffsetRetentionMs - time.milliseconds() - 1)
groupMetadataManager.cleanupGroupMetadata()
// one more offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicPartition1))
assertEquals(None, group.offset(topicPartition2))
assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
EasyMock.verify(replicaManager)
// advance time enough for that last offset to expire
time.sleep(2)
// expect the offset tombstone
EasyMock.reset(partition)
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
.andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
// group and all its offsets should be gone now
assertEquals(None, groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicPartition1))
assertEquals(None, group.offset(topicPartition2))
assertEquals(None, group.offset(topicPartition3))
cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition3).map(_.offset))
EasyMock.verify(replicaManager)
assert(group.is(Dead))
}
@Test
def testOffsetExpirationOfSimpleConsumer() {
val memberId = "memberId"
val clientId = "clientId"
val clientHost = "localhost"
val topic = "foo"
val topicPartition1 = new TopicPartition(topic, 0)
val offset = 37
groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
// expire the offset after 1 and 3 milliseconds (old clients) and after default retention (new clients)
val startMs = time.milliseconds
// old clients, expiry timestamp is explicitly set
val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val tp2OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
// new clients, no per-partition expiry timestamp, offsets of group expire together
val offsets = immutable.Map(
topicPartition1 -> tp1OffsetAndMetadata)
mockGetPartition()
expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
def callback(errors: immutable.Map[TopicPartition, Errors]) {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
// do not expire offsets while within retention period since commit timestamp
val expiryTimestamp = offsets.get(topicPartition1).get.commitTimestamp + defaultOffsetRetentionMs
time.sleep(expiryTimestamp - time.milliseconds() - 1)
groupMetadataManager.cleanupGroupMetadata()
// group and offsets should still be there
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
var cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1)))
assertEquals(Some(offset), cachedOffsets.get(topicPartition1).map(_.offset))
EasyMock.verify(replicaManager)
// advance time to enough for offsets to expire
time.sleep(2)
// expect the offset tombstone
EasyMock.reset(partition)
EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
.andReturn(LogAppendInfo.UnknownLogAppendInfo)
EasyMock.replay(partition)
groupMetadataManager.cleanupGroupMetadata()
// group and all its offsets should be gone now
assertEquals(None, groupMetadataManager.getGroup(groupId))
assertEquals(None, group.offset(topicPartition1))
cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
EasyMock.verify(replicaManager)
assert(group.is(Dead))
}
@Test
def testLoadOffsetFromOldCommit() = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val apiVersion = KAFKA_1_1_IV0
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTime = Some(100))
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolOrNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).get.nonEmpty)
}
}
@Test
def testLoadOffsetWithExplicitRetention() = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 935
val protocolType = "consumer"
val protocol = "range"
val startOffset = 15L
val committedOffsets = Map(
new TopicPartition("foo", 0) -> 23L,
new TopicPartition("foo", 1) -> 455L,
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTime = Some(100))
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
EasyMock.replay(replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals(memberId, group.leaderOrNull)
assertEquals(generation, group.generationId)
assertEquals(Some(protocolType), group.protocolType)
assertEquals(protocol, group.protocolOrNull)
assertEquals(Set(memberId), group.allMembers)
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
assertTrue(group.offset(topicPartition).map(_.expireTimestamp).get.nonEmpty)
}
} }
private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = { private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
@ -1452,20 +1713,21 @@ class GroupMetadataManagerTest {
private def buildStableGroupRecordWithMember(generation: Int, private def buildStableGroupRecordWithMember(generation: Int,
protocolType: String, protocolType: String,
protocol: String, protocol: String,
memberId: String): SimpleRecord = { memberId: String,
apiVersion: ApiVersion = ApiVersion.latestVersion): SimpleRecord = {
val memberProtocols = List((protocol, Array.emptyByteArray)) val memberProtocols = List((protocol, Array.emptyByteArray))
val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols) val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, memberId,
leaderId = memberId, Seq(member)) if (apiVersion >= KAFKA_2_1_IV0) Some(time.milliseconds()) else None, Seq(member), time)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId) val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte])) val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]), apiVersion)
new SimpleRecord(groupMetadataKey, groupMetadataValue) new SimpleRecord(groupMetadataKey, groupMetadataValue)
} }
private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = { private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = {
val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty) val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId) val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty) val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty, ApiVersion.latestVersion)
new SimpleRecord(groupMetadataKey, groupMetadataValue) new SimpleRecord(groupMetadataKey, groupMetadataValue)
} }
@ -1511,11 +1773,19 @@ class GroupMetadataManagerTest {
} }
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long], private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
groupId: String = groupId): Seq[SimpleRecord] = { groupId: String = groupId,
apiVersion: ApiVersion = ApiVersion.latestVersion,
retentionTime: Option[Long] = None): Seq[SimpleRecord] = {
committedOffsets.map { case (topicPartition, offset) => committedOffsets.map { case (topicPartition, offset) =>
val offsetAndMetadata = OffsetAndMetadata(offset) val offsetAndMetadata = retentionTime match {
case Some(timestamp) =>
val commitTimestamp = time.milliseconds()
OffsetAndMetadata(offset, "", commitTimestamp, commitTimestamp + timestamp)
case None =>
OffsetAndMetadata(offset)
}
val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition) val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata) val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
new SimpleRecord(offsetCommitKey, offsetCommitValue) new SimpleRecord(offsetCommitKey, offsetCommitValue)
}.toSeq }.toSeq
} }
@ -1542,7 +1812,7 @@ class GroupMetadataManagerTest {
def testMetrics() { def testMetrics() {
groupMetadataManager.cleanupGroupMetadata() groupMetadataManager.cleanupGroupMetadata()
expectMetrics(groupMetadataManager, 0, 0, 0) expectMetrics(groupMetadataManager, 0, 0, 0)
val group = new GroupMetadata("foo2", Stable) val group = new GroupMetadata("foo2", Stable, time)
groupMetadataManager.addGroup(group) groupMetadataManager.addGroup(group)
expectMetrics(groupMetadataManager, 1, 0, 0) expectMetrics(groupMetadataManager, 1, 0, 0)
group.transitionTo(PreparingRebalance) group.transitionTo(PreparingRebalance)

View File

@ -18,9 +18,8 @@
package kafka.coordinator.group package kafka.coordinator.group
import kafka.common.OffsetAndMetadata import kafka.common.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
import org.junit.Assert._ import org.junit.Assert._
import org.junit.{Before, Test} import org.junit.{Before, Test}
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
@ -40,7 +39,7 @@ class GroupMetadataTest extends JUnitSuite {
@Before @Before
def setUp() { def setUp() {
group = new GroupMetadata("groupId", initialState = Empty) group = new GroupMetadata("groupId", Empty, Time.SYSTEM)
} }
@Test @Test

View File

@ -240,7 +240,7 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.OFFSET_COMMIT => case ApiKeys.OFFSET_COMMIT =>
new OffsetCommitRequest.Builder("test-group", new OffsetCommitRequest.Builder("test-group",
Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava). Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
setMemberId("").setGenerationId(1).setRetentionTime(1000) setMemberId("").setGenerationId(1)
case ApiKeys.OFFSET_FETCH => case ApiKeys.OFFSET_FETCH =>
new OffsetFetchRequest.Builder("test-group", List(tp).asJava) new OffsetFetchRequest.Builder("test-group", List(tp).asJava)

View File

@ -19,6 +19,18 @@
<script id="upgrade-template" type="text/x-handlebars-template"> <script id="upgrade-template" type="text/x-handlebars-template">
<h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4>
<p><b>Additional Upgrade Notes:</b></p>
<ol>
<li>Offset expiration semantics has slightly changed in this version. According to the new semantics, offsets of partitions in a group will
not be removed while the group is subscribed to the corresponding topic and is still active (has active consumers). If group becomes
empty all its offsets will be removed after default offset retention period (or the one set by broker) has passed (unless the group becomes
active again). Offsets associated with standalone (simple) consumers, that do not use Kafka group management, will be removed after default
offset retention period (or the one set by broker) has passed since their last commit.</li>
</ol>
<h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0</a></h4> <h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0</a></h4>
<p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below, <p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_200_notable">notable changes in 2.0.0</a> before upgrading. you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_200_notable">notable changes in 2.0.0</a> before upgrading.
@ -66,6 +78,9 @@
until all brokers in the cluster have been updated. until all brokers in the cluster have been updated.
<p><b>NOTE:</b> any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again. <p><b>NOTE:</b> any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.
</li> </li>
<li>The default for console consumer's <code>enable.auto.commit</code> property when no <code>group.id</code> is provided is now set to <code>false</code>.
This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
</li>
</ol> </ol>
<h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2.0.0</a></h5> <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2.0.0</a></h5>
@ -112,9 +127,9 @@
<code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li> <code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li>
<li>Also as part of KIP-266, the default value of <code>request.timeout.ms</code> has been changed to 30 seconds. <li>Also as part of KIP-266, the default value of <code>request.timeout.ms</code> has been changed to 30 seconds.
The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take. The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take.
Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from
<code>max.poll.interval.ms</code> for the request timeout. All other request types use the timeout defined <code>max.poll.interval.ms</code> for the request timeout. All other request types use the timeout defined
by <code>request.timeout.ms</code></li> by <code>request.timeout.ms</code></li>
<li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li> <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
<li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li> <li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> removes <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> removes