From ecfccb480bd57f5195ea67d23702710014ec0e1a Mon Sep 17 00:00:00 2001 From: dengziming Date: Fri, 9 Jul 2021 01:13:23 +0800 Subject: [PATCH] KAFKA-12660; Do not update offset commit sensor after append failure (#10560) Do not update the commit-sensor if the commit failed and add test logic. The patch also adds 2 unit tests, the first for `OFFSET_METADATA_TOO_LARGE` error, the second is to cover circumstance when one offset is committed and the other is failed with `OFFSET_METADATA_TOO_LARGE`. Both of these cases were uncovered previously. Reviewers: Jason Gustafson --- .../group/GroupMetadataManager.scala | 23 ++--- .../group/GroupMetadataManagerTest.scala | 97 ++++++++++++++++++- .../scala/unit/kafka/utils/TestUtils.scala | 9 +- 3 files changed, 111 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 78d61eedec0..9e3769b6239 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -38,7 +38,7 @@ import kafka.utils._ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.internals.ConsumerProtocol import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.metrics.{Metrics, Sensor} import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter} import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, MessageUtil} import org.apache.kafka.common.record._ @@ -99,7 +99,7 @@ class GroupMetadataManager(brokerId: Int, GroupMetadataManager.MetricsGroup, "The avg time it took to load the partitions in the last 30sec"), new Avg()) - val offsetCommitsSensor = metrics.sensor("OffsetCommits") + val offsetCommitsSensor: Sensor = metrics.sensor("OffsetCommits") offsetCommitsSensor.add(new Meter( metrics.metricName("offset-commit-rate", @@ -109,7 +109,7 @@ class GroupMetadataManager(brokerId: Int, "group-coordinator-metrics", "The total number of committed offsets"))) - val offsetExpiredSensor = metrics.sensor("OffsetExpired") + val offsetExpiredSensor: Sensor = metrics.sensor("OffsetExpired") offsetExpiredSensor.add(new Meter( metrics.metricName("offset-expiration-rate", @@ -184,9 +184,9 @@ class GroupMetadataManager(brokerId: Int, def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values - def isPartitionOwned(partition: Int) = inLock(partitionLock) { ownedPartitions.contains(partition) } + def isPartitionOwned(partition: Int): Boolean = inLock(partitionLock) { ownedPartitions.contains(partition) } - def isPartitionLoading(partition: Int) = inLock(partitionLock) { loadingPartitions.contains(partition) } + def isPartitionLoading(partition: Int): Boolean = inLock(partitionLock) { loadingPartitions.contains(partition) } def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount @@ -197,7 +197,7 @@ class GroupMetadataManager(brokerId: Int, def isLoading: Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty } // return true iff group is owned and the group doesn't exist - def groupNotExists(groupId: String) = inLock(partitionLock) { + def groupNotExists(groupId: String): Boolean = inLock(partitionLock) { isGroupLocal(groupId) && getGroup(groupId).forall { group => group.inLock(group.is(Dead)) } @@ -397,9 +397,6 @@ class GroupMetadataManager(brokerId: Int, throw new IllegalStateException("Append status %s should only have one partition %s" .format(responseStatus, offsetTopicPartition)) - // record the number of offsets committed to the log - offsetCommitsSensor.record(records.size) - // construct the commit response status and insert // the offset and metadata to cache if the append status has no error val status = responseStatus(offsetTopicPartition) @@ -414,6 +411,10 @@ class GroupMetadataManager(brokerId: Int, group.onOffsetCommitAppend(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata)) } } + + // Record the number of offsets committed to the log + offsetCommitsSensor.record(records.size) + Errors.NONE } else { if (!group.is(Dead)) { @@ -954,11 +955,11 @@ class GroupMetadataManager(brokerId: Int, private def groupsBelongingToPartitions(producerId: Long, partitions: Set[Int]) = openGroupsForProducer synchronized { val (ownedGroups, _) = openGroupsForProducer.getOrElse(producerId, mutable.Set.empty[String]) - .partition { case (group) => partitions.contains(partitionFor(group)) } + .partition(group => partitions.contains(partitionFor(group))) ownedGroups } - private def removeGroupFromAllProducers(groupId: String) = openGroupsForProducer synchronized { + private def removeGroupFromAllProducers(groupId: String): Unit = openGroupsForProducer synchronized { openGroupsForProducer.forKeyValue { (_, groups) => groups.remove(groupId) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index d2d3b12ba93..bf475cc7589 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1045,7 +1045,7 @@ class GroupMetadataManagerTest { val e = assertThrows(classOf[IllegalStateException], () => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time)) - assertEquals(s"Unknown group metadata message version: ${unsupportedVersion}", e.getMessage) + assertEquals(s"Unknown group metadata message version: $unsupportedVersion", e.getMessage) } @Test @@ -1260,6 +1260,7 @@ class GroupMetadataManagerTest { commitErrors = Some(errors) } + assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) groupMetadataManager.storeOffsets(group, memberId, offsets, callback) assertTrue(group.hasOffsets) @@ -1277,6 +1278,8 @@ class GroupMetadataManagerTest { assertEquals(offset, partitionResponse.offset) EasyMock.verify(replicaManager) + // Will update sensor after commit + assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } @Test @@ -1462,6 +1465,7 @@ class GroupMetadataManagerTest { commitErrors = Some(errors) } + assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) groupMetadataManager.storeOffsets(group, memberId, offsets, callback) assertTrue(group.hasOffsets) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> @@ -1476,6 +1480,89 @@ class GroupMetadataManagerTest { assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset)) EasyMock.verify(replicaManager) + // Will not update sensor if failed + assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testCommitOffsetPartialFailure(): Unit = { + EasyMock.reset(replicaManager) + + val memberId = "" + val topicPartition = new TopicPartition("foo", 0) + val topicPartitionFailed = new TopicPartition("foo", 1) + val offset = 37 + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + + val group = new GroupMetadata(groupId, Empty, time) + groupMetadataManager.addGroup(group) + + val offsets = immutable.Map( + topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()), + // This will failed + topicPartitionFailed -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()) + ) + + val capturedResponseCallback = appendAndCaptureCallback() + EasyMock.replay(replicaManager) + + var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None + def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = { + commitErrors = Some(errors) + } + + assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + groupMetadataManager.storeOffsets(group, memberId, offsets, callback) + assertTrue(group.hasOffsets) + capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) + + assertFalse(commitErrors.isEmpty) + assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition)) + assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), commitErrors.get.get(topicPartitionFailed)) + assertTrue(group.hasOffsets) + + val cachedOffsets = groupMetadataManager.getOffsets(groupId, defaultRequireStable, Some(Seq(topicPartition, topicPartitionFailed))) + assertEquals(Some(offset), cachedOffsets.get(topicPartition).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartitionFailed).map(_.offset)) + + EasyMock.verify(replicaManager) + assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + + @Test + def testOffsetMetadataTooLarge(): Unit = { + val memberId = "" + val topicPartition = new TopicPartition("foo", 0) + val offset = 37 + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + val group = new GroupMetadata(groupId, Empty, time) + groupMetadataManager.addGroup(group) + + val offsets = immutable.Map( + topicPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()) + ) + + var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None + def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = { + commitErrors = Some(errors) + } + + assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + groupMetadataManager.storeOffsets(group, memberId, offsets, callback) + assertFalse(group.hasOffsets) + + assertFalse(commitErrors.isEmpty) + val maybeError = commitErrors.get.get(topicPartition) + assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), maybeError) + assertFalse(group.hasOffsets) + + val cachedOffsets = groupMetadataManager.getOffsets(groupId, defaultRequireStable, Some(Seq(topicPartition))) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset)) + + assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } @Test @@ -1900,7 +1987,7 @@ class GroupMetadataManagerTest { assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1)) // do not expire offsets while within retention period since commit timestamp - val expiryTimestamp = offsets.get(topicPartition1).get.commitTimestamp + defaultOffsetRetentionMs + val expiryTimestamp = offsets(topicPartition1).commitTimestamp + defaultOffsetRetentionMs time.sleep(expiryTimestamp - time.milliseconds() - 1) groupMetadataManager.cleanupGroupMetadata() @@ -2074,7 +2161,7 @@ class GroupMetadataManagerTest { } @Test - def testLoadOffsetFromOldCommit() = { + def testLoadOffsetFromOldCommit(): Unit = { val groupMetadataTopicPartition = groupTopicPartition val generation = 935 val protocolType = "consumer" @@ -2116,7 +2203,7 @@ class GroupMetadataManagerTest { } @Test - def testLoadOffsetWithExplicitRetention() = { + def testLoadOffsetWithExplicitRetention(): Unit = { val groupMetadataTopicPartition = groupTopicPartition val generation = 935 val protocolType = "consumer" @@ -2391,7 +2478,7 @@ class GroupMetadataManagerTest { EasyMock.anyObject(), EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { - override def answer = capturedCallback.getValue.apply( + override def answer: Unit = capturedCallback.getValue.apply( Map(groupTopicPartition -> new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index f395a3d753d..51ea34aab4d 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -26,8 +26,8 @@ import java.time.Duration import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit} import java.util.{Arrays, Collections, Properties} - import com.yammer.metrics.core.Meter + import javax.net.ssl.X509TrustManager import kafka.api._ import kafka.cluster.{Broker, EndPoint, IsrChangeListener} @@ -51,6 +51,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, UnknownTopicOrPart import org.apache.kafka.common.header.Header import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} @@ -1710,7 +1711,11 @@ object TestUtils extends Logging { } def totalMetricValue(server: KafkaServer, metricName: String): Long = { - val allMetrics = server.metrics.metrics + totalMetricValue(server.metrics, metricName) + } + + def totalMetricValue(metrics: Metrics, metricName: String): Long = { + val allMetrics = metrics.metrics val total = allMetrics.values().asScala.filter(_.metricName().name() == metricName) .foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double]) total.toLong