diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index cbdb8543d92..2c0236ee0cd 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -143,8 +143,9 @@ class GroupMetadataManager(val brokerId: Int, // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and // retry removing this group. val groupPartition = partitionFor(group.groupId) + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition) val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId), - timestamp = time.milliseconds(), magicValue = getMessageFormatVersion(groupPartition)) + timestamp = timestamp, magicValue = magicValue) val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition) partitionOpt.foreach { partition => @@ -169,12 +170,12 @@ class GroupMetadataManager(val brokerId: Int, def prepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], responseCallback: Short => Unit): DelayedStore = { + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId)) val message = new Message( key = GroupMetadataManager.groupMetadataKey(group.groupId), bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment), - timestamp = time.milliseconds(), - magicValue = getMessageFormatVersion(partitionFor(group.groupId)) - ) + timestamp = timestamp, + magicValue = magicValue) val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId)) @@ -253,11 +254,12 @@ class GroupMetadataManager(val brokerId: Int, // construct the message set to append val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId)) new Message( key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition), bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata), - timestamp = time.milliseconds(), - magicValue = getMessageFormatVersion(partitionFor(groupId)) + timestamp = timestamp, + magicValue = magicValue ) }.toSeq @@ -557,8 +559,8 @@ class GroupMetadataManager(val brokerId: Int, val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group, groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition) - (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = time.milliseconds(), - magicValue = getMessageFormatVersion(offsetsPartition))) + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition) + (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)) }.groupBy { case (partition, tombstone) => partition } // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, @@ -627,11 +629,13 @@ class GroupMetadataManager(val brokerId: Int, config.offsetsTopicNumPartitions } - private def getMessageFormatVersion(partition: Int): Byte = { + private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = { val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition) - replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse { + val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse { throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found") } + val timestamp = if (messageFormatVersion == Message.MagicValue_V0) Message.NoTimestamp else time.milliseconds() + (messageFormatVersion, timestamp) } /**