KAFKA-5600; Fix group loading regression causing stale metadata/offset cache

the while loop was too big and need to be closed earlier
to see the fix, ignore whitespace since most of it is indentation

this bug was introduced by commit
5bd06f1d54

Author: Jan Burkhardt <jan.burkhardt@just.social>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3538 from bjrke/trunk
This commit is contained in:
Jan Burkhardt 2017-07-17 11:49:50 -07:00 committed by Jason Gustafson
parent 6d7a81b478
commit e2fe19d22a
2 changed files with 112 additions and 54 deletions

View File

@ -550,57 +550,57 @@ class GroupMetadataManager(brokerId: Int,
} }
currOffset = batch.nextOffset currOffset = batch.nextOffset
} }
val (groupOffsets, emptyGroupOffsets) = loadedOffsets
.groupBy(_._1.group)
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
.partition { case (group, _) => loadedGroups.contains(group) }
val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
pendingOffsets.foreach { case (producerId, producerOffsets) =>
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
producerOffsets
.groupBy(_._1.group)
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)})
.foreach { case (group, offsets) =>
val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
groupProducerOffsets ++= offsets
}
}
val (pendingGroupOffsets, pendingEmptyGroupOffsets) = pendingOffsetsByGroup
.partition { case (group, _) => loadedGroups.contains(group)}
loadedGroups.values.foreach { group =>
val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
loadGroup(group, offsets, pendingOffsets)
onGroupLoaded(group)
}
// load groups which store offsets in kafka, but which have no active members and thus no group
// metadata stored in the log
(emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) =>
val group = new GroupMetadata(groupId)
val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
loadGroup(group, offsets, pendingOffsets)
onGroupLoaded(group)
}
removedGroups.foreach { groupId =>
// if the cache already contains a group which should be removed, raise an error. Note that it
// is possible (however unlikely) for a consumer group to be removed, and then to be used only for
// offset storage (i.e. by "simple" consumers)
if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
s"loading partition $topicPartition")
}
} }
val (groupOffsets, emptyGroupOffsets) = loadedOffsets
.groupBy(_._1.group)
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
.partition { case (group, _) => loadedGroups.contains(group) }
val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
pendingOffsets.foreach { case (producerId, producerOffsets) =>
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
producerOffsets
.groupBy(_._1.group)
.mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)})
.foreach { case (group, offsets) =>
val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
groupProducerOffsets ++= offsets
}
}
val (pendingGroupOffsets, pendingEmptyGroupOffsets) = pendingOffsetsByGroup
.partition { case (group, _) => loadedGroups.contains(group)}
loadedGroups.values.foreach { group =>
val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
loadGroup(group, offsets, pendingOffsets)
onGroupLoaded(group)
}
// load groups which store offsets in kafka, but which have no active members and thus no group
// metadata stored in the log
(emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) =>
val group = new GroupMetadata(groupId)
val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
loadGroup(group, offsets, pendingOffsets)
onGroupLoaded(group)
}
removedGroups.foreach { groupId =>
// if the cache already contains a group which should be removed, raise an error. Note that it
// is possible (however unlikely) for a consumer group to be removed, and then to be used only for
// offset storage (i.e. by "simple" consumers)
if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
s"loading partition $topicPartition")
}
} }
} }

View File

@ -609,6 +609,51 @@ class GroupMetadataManagerTest {
} }
} }
@Test
def testLoadGroupAndOffsetsFromDifferentSegments(): Unit = {
val startOffset = 15L
val tp0 = new TopicPartition("foo", 0)
val tp1 = new TopicPartition("foo", 1)
val tp2 = new TopicPartition("bar", 0)
val tp3 = new TopicPartition("xxx", 0)
val logMock = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
val segment1MemberId = "a"
val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L)
val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(segment1MemberId)): _*)
val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records)
val segment2MemberId = "b"
val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L)
val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE,
createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(segment2MemberId)): _*)
val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records)
EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End))
EasyMock.replay(logMock, replicaManager)
groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ())
val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
assertEquals(groupId, group.groupId)
assertEquals(Stable, group.currentState)
assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderId)
assertEquals("segment2 group record member should be only member", Set(segment2MemberId), group.allMembers)
// offsets of segment1 should be overridden by segment2 offsets of the same topic partitions
val committedOffsets = segment1Offsets ++ segment2Offsets
assertEquals(committedOffsets.size, group.allOffsets.size)
committedOffsets.foreach { case (topicPartition, offset) =>
assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
}
}
@Test @Test
def testAddGroup() { def testAddGroup() {
val group = new GroupMetadata("foo") val group = new GroupMetadata("foo")
@ -1303,20 +1348,33 @@ class GroupMetadataManagerTest {
private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition, private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
startOffset: Long, startOffset: Long,
records: MemoryRecords): Unit = { records: MemoryRecords): Unit = {
val endOffset = startOffset + records.records.asScala.size
val logMock = EasyMock.mock(classOf[Log]) val logMock = EasyMock.mock(classOf[Log])
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
EasyMock.replay(logMock)
}
/**
* mock records into a mocked log
*
* @return the calculated end offset to be mocked into [[ReplicaManager.getLogEndOffset]]
*/
private def expectGroupMetadataLoad(logMock: Log,
startOffset: Long,
records: MemoryRecords): Long = {
val endOffset = startOffset + records.records.asScala.size
val fileRecordsMock = EasyMock.mock(classOf[FileRecords]) val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))) EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
.andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock)) .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt())) EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
.andReturn(records.buffer) .andReturn(records.buffer)
EasyMock.replay(fileRecordsMock)
EasyMock.replay(logMock, fileRecordsMock) endOffset
} }
private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long], private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],