KAFKA-7437; Persist leader epoch in offset commit metadata (#5689)

This commit implements the changes described in KIP-320 for the persistence of leader epoch information in the offset commit protocol.

Reviewers:  Rajini Sivaram <rajinisivaram@googlemail.com>
This commit is contained in:
Jason Gustafson 2018-09-26 06:25:55 -07:00 committed by Rajini Sivaram
parent 5916db11f6
commit 9f7267dd2f
10 changed files with 290 additions and 183 deletions

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
import java.util.Optional
case class OffsetAndMetadata(offset: Long,
leaderEpoch: Optional[Integer],
metadata: String,
commitTimestamp: Long,
expireTimestamp: Option[Long]) {
override def toString: String = {
s"OffsetAndMetadata(offset=$offset" +
s", leaderEpoch=$leaderEpoch" +
s", metadata=$metadata" +
s", commitTimestamp=$commitTimestamp" +
s", expireTimestamp=$expireTimestamp)"
}
}
object OffsetAndMetadata {
val NoMetadata: String = ""
def apply(offset: Long, metadata: String, commitTimestamp: Long): OffsetAndMetadata = {
OffsetAndMetadata(offset, Optional.empty(), metadata, commitTimestamp, None)
}
def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long): OffsetAndMetadata = {
OffsetAndMetadata(offset, Optional.empty(), metadata, commitTimestamp, Some(expireTimestamp))
}
def apply(offset: Long, leaderEpoch: Optional[Integer], metadata: String, commitTimestamp: Long): OffsetAndMetadata = {
OffsetAndMetadata(offset, leaderEpoch, metadata, commitTimestamp, None)
}
}

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.common
import org.apache.kafka.common.protocol.Errors
case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) {
override def toString = "OffsetMetadata[%d,%s]"
.format(offset,
if (metadata != null && metadata.length > 0) metadata else "NO_METADATA")
}
object OffsetMetadata {
val InvalidOffset: Long = -1L
val NoMetadata: String = ""
val InvalidOffsetMetadata = OffsetMetadata(OffsetMetadata.InvalidOffset, OffsetMetadata.NoMetadata)
}
case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
expireTimestamp: Option[Long] = None) {
def offset = offsetMetadata.offset
def metadata = offsetMetadata.metadata
override def toString = s"[$offsetMetadata,CommitTime $commitTimestamp,ExpirationTime ${expireTimestamp.getOrElse("_")}]"
}
object OffsetAndMetadata {
def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, Some(expireTimestamp))
def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp)
def apply(offset: Long, metadata: String) = new OffsetAndMetadata(OffsetMetadata(offset, metadata))
def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata))
}
case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors = Errors.NONE) {
def offset = offsetMetadata.offset
def metadata = offsetMetadata.metadata
override def toString = "[%s, Error=%s]".format(offsetMetadata, error)
}
object OffsetMetadataAndError {
val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE)
val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_LOAD_IN_PROGRESS)
val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID)
val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR)
val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_NOT_AVAILABLE)
val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION)
val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION)
def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE)
def apply(error: Errors) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error)
def apply(offset: Long, metadata: String, error: Errors) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error)
}

View File

@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0}
import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1}
import kafka.common.{MessageFormatter, OffsetAndMetadata}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.ReplicaManager
@ -460,7 +460,7 @@ class GroupMetadataManager(brokerId: Int,
// that commit offsets to Kafka.)
group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset,
Optional.empty(), offsetAndMetadata.metadata, Errors.NONE)
offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
}
case Some(topicPartitions) =>
@ -471,7 +471,7 @@ class GroupMetadataManager(brokerId: Int,
Optional.empty(), "", Errors.NONE)
case Some(offsetAndMetadata) =>
new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset,
Optional.empty(), offsetAndMetadata.metadata, Errors.NONE)
offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
}
topicPartition -> partitionData
}.toMap
@ -960,6 +960,16 @@ object GroupMetadataManager {
private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
new Field("offset", INT64),
new Field("leader_epoch", INT32),
new Field("metadata", STRING, "Associated metadata.", ""),
new Field("commit_timestamp", INT64))
private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
@ -1019,7 +1029,6 @@ object GroupMetadataManager {
new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
// map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_KEY_SCHEMA,
@ -1030,21 +1039,18 @@ object GroupMetadataManager {
private val OFFSET_VALUE_SCHEMAS = Map(
0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2)
2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
// map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map(
0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 2.toShort
private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(2)
private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
private def schemaForKey(version: Int) = {
val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
schemaOpt match {
@ -1053,7 +1059,7 @@ object GroupMetadataManager {
}
}
private def schemaForOffset(version: Int) = {
private def schemaForOffsetValue(version: Int) = {
val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
schemaOpt match {
case Some(schema) => schema
@ -1061,7 +1067,7 @@ object GroupMetadataManager {
}
}
private def schemaForGroup(version: Int) = {
private def schemaForGroupValue(version: Int) = {
val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
schemaOpt match {
case Some(schema) => schema
@ -1074,8 +1080,8 @@ object GroupMetadataManager {
*
* @return key for offset commit message
*/
private[group] def offsetCommitKey(group: String, topicPartition: TopicPartition,
versionId: Short = 0): Array[Byte] = {
private[group] def offsetCommitKey(group: String,
topicPartition: TopicPartition): Array[Byte] = {
val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
key.set(OFFSET_KEY_GROUP_FIELD, group)
key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
@ -1113,27 +1119,34 @@ object GroupMetadataManager {
apiVersion: ApiVersion): Array[Byte] = {
// generate commit value according to schema version
val (version, value) = {
if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
// if an older version of the API is used, or if an explicit expiration is provided, use the older schema
(1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
else
(2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
}
if (version == 2) {
value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
} else {
value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
// version 1 has a non empty expireTimestamp field
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) {
val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1)
value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
// version 1 has a non empty expireTimestamp field
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
(1, value)
} else if (apiVersion < KAFKA_2_1_IV1) {
val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2)
value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
(2, value)
} else {
val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V3)
value.set(OFFSET_VALUE_OFFSET_FIELD_V3, offsetAndMetadata.offset)
value.set(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3,
offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
value.set(OFFSET_VALUE_METADATA_FIELD_V3, offsetAndMetadata.metadata)
value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3, offsetAndMetadata.commitTimestamp)
(3, value)
}
}
val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
byteBuffer.putShort(version)
byteBuffer.putShort(version.toShort)
value.writeTo(byteBuffer)
byteBuffer.array()
}
@ -1157,7 +1170,7 @@ object GroupMetadataManager {
else if (apiVersion < KAFKA_2_1_IV0)
(1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
else
(2.toShort, new Struct(CURRENT_GROUP_VALUE_SCHEMA))
(2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
}
value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
@ -1242,7 +1255,7 @@ object GroupMetadataManager {
null
} else {
val version = buffer.getShort
val valueSchema = schemaForOffset(version)
val valueSchema = schemaForOffsetValue(version)
val value = valueSchema.read(buffer)
if (version == 0) {
@ -1264,6 +1277,14 @@ object GroupMetadataManager {
val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
OffsetAndMetadata(offset, metadata, commitTimestamp)
} else if (version == 3) {
val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) Optional.empty() else Optional.of(leaderEpoch)
OffsetAndMetadata(offset, leaderEpochOpt, metadata, commitTimestamp)
} else {
throw new IllegalStateException(s"Unknown offset message version: $version")
}
@ -1282,7 +1303,7 @@ object GroupMetadataManager {
null
} else {
val version = buffer.getShort
val valueSchema = schemaForGroup(version)
val valueSchema = schemaForGroupValue(version)
val value = valueSchema.read(buffer)
if (version >= 0 && version <= 2) {

View File

@ -27,7 +27,7 @@ import java.util.{Collections, Optional, Properties}
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.cluster.Partition
import kafka.common.{OffsetAndMetadata, OffsetMetadata}
import kafka.common.OffsetAndMetadata
import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
@ -340,9 +340,11 @@ class KafkaApis(val requestChannel: RequestChannel,
// - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
val currentTimestamp = time.milliseconds
val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
offset = partitionData.offset,
leaderEpoch = partitionData.leaderEpoch,
metadata = metadata,
commitTimestamp = partitionData.timestamp match {
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
case customTimestamp => customTimestamp
@ -1907,15 +1909,15 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def convertTxnOffsets(offsetsMap: immutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs
val currentTimestamp = time.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
offsetsMap.map { case (topicPartition, partitionData) =>
val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata
topicPartition -> new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
offset = partitionData.offset,
leaderEpoch = partitionData.leaderEpoch,
metadata = metadata,
commitTimestamp = currentTimestamp,
expireTimestamp = Some(defaultExpireTimestamp))
expireTimestamp = None)
}
}

View File

@ -14,7 +14,7 @@ package kafka.api
import java.util
import java.util.regex.Pattern
import java.util.{Collections, Locale, Properties}
import java.util.{Collections, Locale, Optional, Properties}
import kafka.log.LogConfig
import kafka.server.KafkaConfig
@ -503,7 +503,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumer.assign(List(tp).asJava)
// sync commit
val syncMetadata = new OffsetAndMetadata(5, "foo")
val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
consumer.commitSync(Map((tp, syncMetadata)).asJava)
assertEquals(syncMetadata, consumer.committed(tp))

View File

@ -18,7 +18,7 @@
package kafka.api
import java.lang.{Long => JLong}
import java.util.Properties
import java.util.{Optional, Properties}
import java.util.concurrent.TimeUnit
import kafka.integration.KafkaServerTestHarness
@ -364,6 +364,31 @@ class TransactionsTest extends KafkaServerTestHarness {
}
}
@Test
def testOffsetMetadataInSendOffsetsToTransaction() = {
val tp = new TopicPartition(topic1, 0)
val groupId = "group"
val producer = transactionalProducers.head
val consumer = createReadCommittedConsumer(groupId)
consumer.subscribe(List(topic1).asJava)
producer.initTransactions()
producer.beginTransaction()
val offsetAndMetadata = new OffsetAndMetadata(110L, Optional.of(15), "some metadata")
producer.sendOffsetsToTransaction(Map(tp -> offsetAndMetadata).asJava, groupId)
producer.commitTransaction() // ok
// The call to commit the transaction may return before all markers are visible, so we initialize a second
// producer to ensure the transaction completes and the committed offsets are visible.
val producer2 = transactionalProducers(1)
producer2.initTransactions()
assertEquals(offsetAndMetadata, consumer.committed(tp))
}
@Test
def testFencingOnSend() {
val producer1 = transactionalProducers(0)
@ -434,7 +459,7 @@ class TransactionsTest extends KafkaServerTestHarness {
val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
val recordMetadata = result.get()
error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
servers.foreach { case (server) =>
servers.foreach { server =>
error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
}
fail("Should not be able to send messages from a fenced producer.")

View File

@ -17,24 +17,25 @@
package kafka.coordinator.group
import java.util.concurrent.{ ConcurrentHashMap, TimeUnit }
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import kafka.common.OffsetAndMetadata
import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
import kafka.server.{ DelayedOperationPurgatory, KafkaConfig }
import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupRequest
import org.apache.kafka.common.utils.Time
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{ After, Before, Test }
import org.junit.{After, Before, Test}
import scala.collection._
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future, Promise, TimeoutException }
import scala.concurrent.{Await, Future, Promise, TimeoutException}
class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest[GroupMember] {
@ -211,7 +212,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
}
override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = {
val tp = new TopicPartition("topic", 0)
val offsets = immutable.Map(tp -> OffsetAndMetadata(1))
val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
groupCoordinator.handleCommitOffsets(member.groupId, member.memberId, member.generationId,
offsets, responseCallback)
}
@ -224,7 +225,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
class CommitTxnOffsetsOperation extends CommitOffsetsOperation {
override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = {
val tp = new TopicPartition("topic", 0)
val offsets = immutable.Map(tp -> OffsetAndMetadata(1))
val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
val producerId = 1000L
val producerEpoch : Short = 2
// When transaction offsets are appended to the log, transactions may be scheduled for
@ -309,4 +310,5 @@ object GroupCoordinatorConcurrencyTest {
@volatile var generationId: Int = -1
def groupId: String = group.groupId
}
}

View File

@ -17,6 +17,8 @@
package kafka.coordinator.group
import java.util.Optional
import kafka.common.OffsetAndMetadata
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
import kafka.utils._
@ -138,7 +140,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val topicPartition = new TopicPartition("foo", 0)
var offsetCommitErrors = Map.empty[TopicPartition, Errors]
groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result })
Map(topicPartition -> offsetAndMetadata(15L)), result => { offsetCommitErrors = result })
assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition))
// Heartbeat
@ -436,7 +438,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val sessionTimeout = 1000
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
@ -818,7 +820,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testCommitOffsetFromUnknownGroup() {
val generationId = 1
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset))
assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
@ -827,7 +829,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testCommitOffsetWithDefaultGeneration() {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
@ -854,7 +856,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// The simple offset commit should now fail
EasyMock.reset(replicaManager)
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
@ -865,17 +867,25 @@ class GroupCoordinatorTest extends JUnitSuite {
}
@Test
def testFetchOffsets() {
def testFetchOffsets(): Unit = {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = 97L
val metadata = "some metadata"
val leaderEpoch = Optional.of[Integer](15)
val offsetAndMetadata = OffsetAndMetadata(offset, leaderEpoch, metadata, timer.time.milliseconds())
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offsetAndMetadata))
assertEquals(Errors.NONE, commitOffsetResult(tp))
val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
assertEquals(Errors.NONE, error)
assertEquals(Some(0), partitionData.get(tp).map(_.offset))
val maybePartitionData = partitionData.get(tp)
assertTrue(maybePartitionData.isDefined)
assertEquals(offset, maybePartitionData.get.offset)
assertEquals(metadata, maybePartitionData.get.metadata)
assertEquals(leaderEpoch, maybePartitionData.get.leaderEpoch)
}
@Test
@ -884,7 +894,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// To allow inspection and removal of the empty group, we must also support DescribeGroups and DeleteGroups
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val groupId = ""
val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
@ -919,7 +929,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testBasicFetchTxnOffsets() {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@ -946,7 +956,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testFetchTxnOffsetsWithAbort() {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@ -970,7 +980,7 @@ class GroupCoordinatorTest extends JUnitSuite {
@Test
def testFetchTxnOffsetsIgnoreSpuriousCommit() {
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@ -1003,7 +1013,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// Marker for only one partition is received. That commit should be materialized while the other should not.
val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))
val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
val producerId = 1000L
val producerEpoch: Short = 3
@ -1082,7 +1092,7 @@ class GroupCoordinatorTest extends JUnitSuite {
// Each partition's offsets should be materialized when the corresponding producer's marker is received.
val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))
val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
val producerIds = List(1000L, 1005L)
val producerEpochs: Seq[Short] = List(3, 4)
@ -1154,9 +1164,9 @@ class GroupCoordinatorTest extends JUnitSuite {
val tp1 = new TopicPartition("topic", 0)
val tp2 = new TopicPartition("topic", 1)
val tp3 = new TopicPartition("other-topic", 0)
val offset1 = OffsetAndMetadata(15)
val offset2 = OffsetAndMetadata(16)
val offset3 = OffsetAndMetadata(17)
val offset1 = offsetAndMetadata(15)
val offset2 = offsetAndMetadata(16)
val offset3 = offsetAndMetadata(17)
assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId))
@ -1179,7 +1189,7 @@ class GroupCoordinatorTest extends JUnitSuite {
def testCommitOffsetInCompletingRebalance() {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
val assignedMemberId = joinGroupResult.memberId
@ -1425,7 +1435,7 @@ class GroupCoordinatorTest extends JUnitSuite {
EasyMock.reset(replicaManager)
val tp = new TopicPartition("topic", 0)
val offset = OffsetAndMetadata(0)
val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tp -> offset))
assertEquals(Errors.NONE, commitOffsetResult(tp))
@ -1730,4 +1740,8 @@ class GroupCoordinatorTest extends JUnitSuite {
groupCoordinator.groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
}
private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
OffsetAndMetadata(offset, "", timer.time.milliseconds())
}
}

View File

@ -17,7 +17,7 @@
package kafka.coordinator.group
import kafka.api.{ApiVersion, KAFKA_1_1_IV0, KAFKA_2_1_IV0}
import kafka.api._
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
import kafka.log.{Log, LogAppendInfo}
@ -35,6 +35,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
import org.junit.{Before, Test}
import java.nio.ByteBuffer
import java.util.Optional
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Gauge
@ -932,7 +933,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
expectAppendMessage(Errors.NONE)
EasyMock.replay(replicaManager)
@ -974,7 +975,8 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
val offsetAndMetadata = OffsetAndMetadata(offset, "", time.milliseconds())
val offsets = immutable.Map(topicPartition -> offsetAndMetadata)
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
@ -996,7 +998,7 @@ class GroupMetadataManagerTest {
group.completePendingTxnOffsetCommit(producerId, isCommit = true)
assertTrue(group.hasOffsets)
assertFalse(group.allOffsets.isEmpty)
assertEquals(Some(OffsetAndMetadata(offset)), group.offset(topicPartition))
assertEquals(Some(offsetAndMetadata), group.offset(topicPartition))
EasyMock.verify(replicaManager)
}
@ -1014,7 +1016,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
@ -1053,7 +1055,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
@ -1091,7 +1093,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
EasyMock.replay(replicaManager)
@ -1133,7 +1135,7 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
@ -1329,7 +1331,7 @@ class GroupMetadataManagerTest {
// expire the offset after 1 millisecond
val startMs = time.milliseconds
val offsets = immutable.Map(
topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
topicPartition1 -> OffsetAndMetadata(offset, Optional.empty(), "", startMs, Some(startMs + 1)),
topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
mockGetPartition()
@ -1633,7 +1635,7 @@ class GroupMetadataManagerTest {
)
val apiVersion = KAFKA_1_1_IV0
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTime = Some(100))
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTimeOpt = Some(100))
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
@ -1673,7 +1675,7 @@ class GroupMetadataManagerTest {
new TopicPartition("bar", 0) -> 8992L
)
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTime = Some(100))
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTimeOpt = Some(100))
val memberId = "98098230493"
val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
@ -1700,6 +1702,70 @@ class GroupMetadataManagerTest {
}
}
@Test
def testSerdeOffsetCommitValue(): Unit = {
val offsetAndMetadata = OffsetAndMetadata(
offset = 537L,
leaderEpoch = Optional.of(15),
metadata = "metadata",
commitTimestamp = time.milliseconds(),
expireTimestamp = None)
def verifySerde(apiVersion: ApiVersion, expectedOffsetCommitValueVersion: Int): Unit = {
val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
val buffer = ByteBuffer.wrap(bytes)
assertEquals(expectedOffsetCommitValueVersion, buffer.getShort(0).toInt)
val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
assertEquals(offsetAndMetadata.offset, deserializedOffsetAndMetadata.offset)
assertEquals(offsetAndMetadata.metadata, deserializedOffsetAndMetadata.metadata)
assertEquals(offsetAndMetadata.commitTimestamp, deserializedOffsetAndMetadata.commitTimestamp)
// Serialization drops the leader epoch silently if an older inter-broker protocol is in use
val expectedLeaderEpoch = if (expectedOffsetCommitValueVersion >= 3)
offsetAndMetadata.leaderEpoch
else
Optional.empty()
assertEquals(expectedLeaderEpoch, deserializedOffsetAndMetadata.leaderEpoch)
}
for (version <- ApiVersion.allVersions) {
val expectedSchemaVersion = version match {
case v if v < KAFKA_2_1_IV0 => 1
case v if v < KAFKA_2_1_IV1 => 2
case _ => 3
}
verifySerde(version, expectedSchemaVersion)
}
}
@Test
def testSerdeOffsetCommitValueWithExpireTimestamp(): Unit = {
// If expire timestamp is set, we should always use version 1 of the offset commit
// value schema since later versions do not support it
val offsetAndMetadata = OffsetAndMetadata(
offset = 537L,
leaderEpoch = Optional.empty(),
metadata = "metadata",
commitTimestamp = time.milliseconds(),
expireTimestamp = Some(time.milliseconds() + 1000))
def verifySerde(apiVersion: ApiVersion): Unit = {
val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
val buffer = ByteBuffer.wrap(bytes)
assertEquals(1, buffer.getShort(0).toInt)
val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
}
for (version <- ApiVersion.allVersions)
verifySerde(version)
}
private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
@ -1804,14 +1870,15 @@ class GroupMetadataManagerTest {
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
groupId: String = groupId,
apiVersion: ApiVersion = ApiVersion.latestVersion,
retentionTime: Option[Long] = None): Seq[SimpleRecord] = {
retentionTimeOpt: Option[Long] = None): Seq[SimpleRecord] = {
committedOffsets.map { case (topicPartition, offset) =>
val offsetAndMetadata = retentionTime match {
case Some(timestamp) =>
val commitTimestamp = time.milliseconds()
OffsetAndMetadata(offset, "", commitTimestamp, commitTimestamp + timestamp)
val commitTimestamp = time.milliseconds()
val offsetAndMetadata = retentionTimeOpt match {
case Some(retentionTimeMs) =>
val expirationTime = commitTimestamp + retentionTimeMs
OffsetAndMetadata(offset, "", commitTimestamp, expirationTime)
case None =>
OffsetAndMetadata(offset)
OffsetAndMetadata(offset, "", commitTimestamp)
}
val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)

View File

@ -293,7 +293,7 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testOffsetCommit(): Unit = {
val partition = new TopicPartition("foo", 0)
val offset = OffsetAndMetadata(37)
val offset = offsetAndMetadata(37)
val commitRecordOffset = 3
group.prepareOffsetCommit(Map(partition -> offset))
@ -308,7 +308,7 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testOffsetCommitFailure(): Unit = {
val partition = new TopicPartition("foo", 0)
val offset = OffsetAndMetadata(37)
val offset = offsetAndMetadata(37)
group.prepareOffsetCommit(Map(partition -> offset))
assertTrue(group.hasOffsets)
@ -322,8 +322,8 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testOffsetCommitFailureWithAnotherPending(): Unit = {
val partition = new TopicPartition("foo", 0)
val firstOffset = OffsetAndMetadata(37)
val secondOffset = OffsetAndMetadata(57)
val firstOffset = offsetAndMetadata(37)
val secondOffset = offsetAndMetadata(57)
group.prepareOffsetCommit(Map(partition -> firstOffset))
assertTrue(group.hasOffsets)
@ -344,8 +344,8 @@ class GroupMetadataTest extends JUnitSuite {
@Test
def testOffsetCommitWithAnotherPending(): Unit = {
val partition = new TopicPartition("foo", 0)
val firstOffset = OffsetAndMetadata(37)
val secondOffset = OffsetAndMetadata(57)
val firstOffset = offsetAndMetadata(37)
val secondOffset = offsetAndMetadata(57)
group.prepareOffsetCommit(Map(partition -> firstOffset))
assertTrue(group.hasOffsets)
@ -367,8 +367,8 @@ class GroupMetadataTest extends JUnitSuite {
def testConsumerBeatsTransactionalOffsetCommit(): Unit = {
val partition = new TopicPartition("foo", 0)
val producerId = 13232L
val txnOffsetCommit = OffsetAndMetadata(37)
val consumerOffsetCommit = OffsetAndMetadata(57)
val txnOffsetCommit = offsetAndMetadata(37)
val consumerOffsetCommit = offsetAndMetadata(57)
group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
assertTrue(group.hasOffsets)
@ -392,8 +392,8 @@ class GroupMetadataTest extends JUnitSuite {
def testTransactionBeatsConsumerOffsetCommit(): Unit = {
val partition = new TopicPartition("foo", 0)
val producerId = 13232L
val txnOffsetCommit = OffsetAndMetadata(37)
val consumerOffsetCommit = OffsetAndMetadata(57)
val txnOffsetCommit = offsetAndMetadata(37)
val consumerOffsetCommit = offsetAndMetadata(57)
group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
assertTrue(group.hasOffsets)
@ -419,8 +419,8 @@ class GroupMetadataTest extends JUnitSuite {
def testTransactionalCommitIsAbortedAndConsumerCommitWins(): Unit = {
val partition = new TopicPartition("foo", 0)
val producerId = 13232L
val txnOffsetCommit = OffsetAndMetadata(37)
val consumerOffsetCommit = OffsetAndMetadata(57)
val txnOffsetCommit = offsetAndMetadata(37)
val consumerOffsetCommit = offsetAndMetadata(57)
group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
assertTrue(group.hasOffsets)
@ -447,7 +447,7 @@ class GroupMetadataTest extends JUnitSuite {
def testFailedTxnOffsetCommitLeavesNoPendingState(): Unit = {
val partition = new TopicPartition("foo", 0)
val producerId = 13232L
val txnOffsetCommit = OffsetAndMetadata(37)
val txnOffsetCommit = offsetAndMetadata(37)
group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
@ -471,4 +471,9 @@ class GroupMetadataTest extends JUnitSuite {
}
assertTrue(group.is(targetState))
}
private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
OffsetAndMetadata(offset, "", Time.SYSTEM.milliseconds())
}
}