KAFKA-12660; Do not update offset commit sensor after append failure (#10560)

Do not update the commit-sensor if the commit failed and add test logic. The patch also adds 2 unit tests, the first for `OFFSET_METADATA_TOO_LARGE` error, the second is to cover circumstance when one offset is committed and the other is failed with `OFFSET_METADATA_TOO_LARGE`. Both of these cases were uncovered previously.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
dengziming 2021-07-09 01:13:23 +08:00 committed by GitHub
parent 5a88a59ddd
commit ecfccb480b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 18 deletions

View File

@ -38,7 +38,7 @@ import kafka.utils._
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.{Metrics, Sensor}
import org.apache.kafka.common.metrics.stats.{Avg, Max, Meter}
import org.apache.kafka.common.protocol.{ByteBufferAccessor, Errors, MessageUtil}
import org.apache.kafka.common.record._
@ -99,7 +99,7 @@ class GroupMetadataManager(brokerId: Int,
GroupMetadataManager.MetricsGroup,
"The avg time it took to load the partitions in the last 30sec"), new Avg())
val offsetCommitsSensor = metrics.sensor("OffsetCommits")
val offsetCommitsSensor: Sensor = metrics.sensor("OffsetCommits")
offsetCommitsSensor.add(new Meter(
metrics.metricName("offset-commit-rate",
@ -109,7 +109,7 @@ class GroupMetadataManager(brokerId: Int,
"group-coordinator-metrics",
"The total number of committed offsets")))
val offsetExpiredSensor = metrics.sensor("OffsetExpired")
val offsetExpiredSensor: Sensor = metrics.sensor("OffsetExpired")
offsetExpiredSensor.add(new Meter(
metrics.metricName("offset-expiration-rate",
@ -184,9 +184,9 @@ class GroupMetadataManager(brokerId: Int,
def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
def isPartitionOwned(partition: Int) = inLock(partitionLock) { ownedPartitions.contains(partition) }
def isPartitionOwned(partition: Int): Boolean = inLock(partitionLock) { ownedPartitions.contains(partition) }
def isPartitionLoading(partition: Int) = inLock(partitionLock) { loadingPartitions.contains(partition) }
def isPartitionLoading(partition: Int): Boolean = inLock(partitionLock) { loadingPartitions.contains(partition) }
def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
@ -197,7 +197,7 @@ class GroupMetadataManager(brokerId: Int,
def isLoading: Boolean = inLock(partitionLock) { loadingPartitions.nonEmpty }
// return true iff group is owned and the group doesn't exist
def groupNotExists(groupId: String) = inLock(partitionLock) {
def groupNotExists(groupId: String): Boolean = inLock(partitionLock) {
isGroupLocal(groupId) && getGroup(groupId).forall { group =>
group.inLock(group.is(Dead))
}
@ -397,9 +397,6 @@ class GroupMetadataManager(brokerId: Int,
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, offsetTopicPartition))
// record the number of offsets committed to the log
offsetCommitsSensor.record(records.size)
// construct the commit response status and insert
// the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)
@ -414,6 +411,10 @@ class GroupMetadataManager(brokerId: Int,
group.onOffsetCommitAppend(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
}
}
// Record the number of offsets committed to the log
offsetCommitsSensor.record(records.size)
Errors.NONE
} else {
if (!group.is(Dead)) {
@ -954,11 +955,11 @@ class GroupMetadataManager(brokerId: Int,
private def groupsBelongingToPartitions(producerId: Long, partitions: Set[Int]) = openGroupsForProducer synchronized {
val (ownedGroups, _) = openGroupsForProducer.getOrElse(producerId, mutable.Set.empty[String])
.partition { case (group) => partitions.contains(partitionFor(group)) }
.partition(group => partitions.contains(partitionFor(group)))
ownedGroups
}
private def removeGroupFromAllProducers(groupId: String) = openGroupsForProducer synchronized {
private def removeGroupFromAllProducers(groupId: String): Unit = openGroupsForProducer synchronized {
openGroupsForProducer.forKeyValue { (_, groups) =>
groups.remove(groupId)
}

View File

@ -1045,7 +1045,7 @@ class GroupMetadataManagerTest {
val e = assertThrows(classOf[IllegalStateException],
() => GroupMetadataManager.readGroupMessageValue(groupId, groupMetadataRecordValue, time))
assertEquals(s"Unknown group metadata message version: ${unsupportedVersion}", e.getMessage)
assertEquals(s"Unknown group metadata message version: $unsupportedVersion", e.getMessage)
}
@Test
@ -1260,6 +1260,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
@ -1277,6 +1278,8 @@ class GroupMetadataManagerTest {
assertEquals(offset, partitionResponse.offset)
EasyMock.verify(replicaManager)
// Will update sensor after commit
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
@ -1462,6 +1465,7 @@ class GroupMetadataManagerTest {
commitErrors = Some(errors)
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
@ -1476,6 +1480,89 @@ class GroupMetadataManagerTest {
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset))
EasyMock.verify(replicaManager)
// Will not update sensor if failed
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
def testCommitOffsetPartialFailure(): Unit = {
EasyMock.reset(replicaManager)
val memberId = ""
val topicPartition = new TopicPartition("foo", 0)
val topicPartitionFailed = new TopicPartition("foo", 1)
val offset = 37
groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(
topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
// This will failed
topicPartitionFailed -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds())
)
val capturedResponseCallback = appendAndCaptureCallback()
EasyMock.replay(replicaManager)
var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertTrue(group.hasOffsets)
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertFalse(commitErrors.isEmpty)
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition))
assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), commitErrors.get.get(topicPartitionFailed))
assertTrue(group.hasOffsets)
val cachedOffsets = groupMetadataManager.getOffsets(groupId, defaultRequireStable, Some(Seq(topicPartition, topicPartitionFailed)))
assertEquals(Some(offset), cachedOffsets.get(topicPartition).map(_.offset))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartitionFailed).map(_.offset))
EasyMock.verify(replicaManager)
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
def testOffsetMetadataTooLarge(): Unit = {
val memberId = ""
val topicPartition = new TopicPartition("foo", 0)
val offset = 37
groupMetadataManager.addPartitionOwnership(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(
topicPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds())
)
var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertFalse(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
val maybeError = commitErrors.get.get(topicPartition)
assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), maybeError)
assertFalse(group.hasOffsets)
val cachedOffsets = groupMetadataManager.getOffsets(groupId, defaultRequireStable, Some(Seq(topicPartition)))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset))
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@Test
@ -1900,7 +1987,7 @@ class GroupMetadataManagerTest {
assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
// do not expire offsets while within retention period since commit timestamp
val expiryTimestamp = offsets.get(topicPartition1).get.commitTimestamp + defaultOffsetRetentionMs
val expiryTimestamp = offsets(topicPartition1).commitTimestamp + defaultOffsetRetentionMs
time.sleep(expiryTimestamp - time.milliseconds() - 1)
groupMetadataManager.cleanupGroupMetadata()
@ -2074,7 +2161,7 @@ class GroupMetadataManagerTest {
}
@Test
def testLoadOffsetFromOldCommit() = {
def testLoadOffsetFromOldCommit(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 935
val protocolType = "consumer"
@ -2116,7 +2203,7 @@ class GroupMetadataManagerTest {
}
@Test
def testLoadOffsetWithExplicitRetention() = {
def testLoadOffsetWithExplicitRetention(): Unit = {
val groupMetadataTopicPartition = groupTopicPartition
val generation = 935
val protocolType = "consumer"
@ -2391,7 +2478,7 @@ class GroupMetadataManagerTest {
EasyMock.anyObject(),
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedCallback.getValue.apply(
override def answer: Unit = capturedCallback.getValue.apply(
Map(groupTopicPartition ->
new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L)
)

View File

@ -26,8 +26,8 @@ import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import java.util.{Arrays, Collections, Properties}
import com.yammer.metrics.core.Meter
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
@ -51,6 +51,7 @@ import org.apache.kafka.common.errors.{KafkaStorageException, UnknownTopicOrPart
import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
@ -1710,7 +1711,11 @@ object TestUtils extends Logging {
}
def totalMetricValue(server: KafkaServer, metricName: String): Long = {
val allMetrics = server.metrics.metrics
totalMetricValue(server.metrics, metricName)
}
def totalMetricValue(metrics: Metrics, metricName: String): Long = {
val allMetrics = metrics.metrics
val total = allMetrics.values().asScala.filter(_.metricName().name() == metricName)
.foldLeft(0.0)((total, metric) => total + metric.metricValue.asInstanceOf[Double])
total.toLong