mirror of https://github.com/apache/kafka.git
Compare commits
9 Commits
8973814894
...
45505f4719
Author | SHA1 | Date |
---|---|---|
|
45505f4719 | |
|
2938c4242e | |
|
121e934f7b | |
|
bfda2319c6 | |
|
b1c4108aa4 | |
|
9566c6429e | |
|
576d072707 | |
|
19a7864f62 | |
|
23476deddc |
|
@ -26,9 +26,9 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
|
|||
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
|
||||
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
|
||||
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
|
||||
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
|
||||
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
|
||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
|
||||
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, StreamsGroupDescribeRequest, StreamsGroupDescribeResponse, StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import org.apache.kafka.common.test.ClusterInstance
|
||||
import org.apache.kafka.common.utils.ProducerIdAndEpoch
|
||||
|
@ -768,6 +768,21 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
|
|||
shareGroupDescribeResponse.data.groups.asScala.toList
|
||||
}
|
||||
|
||||
protected def streamsGroupDescribe(
|
||||
groupIds: List[String],
|
||||
includeAuthorizedOperations: Boolean = false,
|
||||
version: Short = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
|
||||
): List[StreamsGroupDescribeResponseData.DescribedGroup] = {
|
||||
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
|
||||
new StreamsGroupDescribeRequestData()
|
||||
.setGroupIds(groupIds.asJava)
|
||||
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
|
||||
).build(version)
|
||||
|
||||
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
|
||||
streamsGroupDescribeResponse.data.groups.asScala.toList
|
||||
}
|
||||
|
||||
protected def heartbeat(
|
||||
groupId: String,
|
||||
generationId: Int,
|
||||
|
@ -855,6 +870,41 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
|
|||
shareGroupHeartbeatResponse.data
|
||||
}
|
||||
|
||||
protected def streamsGroupHeartbeat(
|
||||
groupId: String,
|
||||
memberId: String = "",
|
||||
memberEpoch: Int = 0,
|
||||
rebalanceTimeoutMs: Int = -1,
|
||||
activeTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
|
||||
standbyTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
|
||||
warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
|
||||
topology: StreamsGroupHeartbeatRequestData.Topology = null,
|
||||
expectedError: Errors = Errors.NONE,
|
||||
version: Short = ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
|
||||
): StreamsGroupHeartbeatResponseData = {
|
||||
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder(
|
||||
new StreamsGroupHeartbeatRequestData()
|
||||
.setGroupId(groupId)
|
||||
.setMemberId(memberId)
|
||||
.setMemberEpoch(memberEpoch)
|
||||
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
|
||||
.setActiveTasks(activeTasks.asJava)
|
||||
.setStandbyTasks(standbyTasks.asJava)
|
||||
.setWarmupTasks(warmupTasks.asJava)
|
||||
.setTopology(topology)
|
||||
).build(version)
|
||||
|
||||
// Send the request until receiving a successful response. There is a delay
|
||||
// here because the group coordinator is loaded in the background.
|
||||
var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
|
||||
streamsGroupHeartbeatResponse.data.errorCode == expectedError.code
|
||||
}, msg = s"Could not heartbeat successfully. Last response $streamsGroupHeartbeatResponse.")
|
||||
|
||||
streamsGroupHeartbeatResponse.data
|
||||
}
|
||||
|
||||
protected def leaveGroupWithNewProtocol(
|
||||
groupId: String,
|
||||
memberId: String
|
||||
|
|
|
@ -0,0 +1,316 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package kafka.server
|
||||
|
||||
import kafka.utils.TestUtils
|
||||
import org.apache.kafka.common.message.{StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData}
|
||||
import org.apache.kafka.common.utils.Utils
|
||||
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
|
||||
import org.apache.kafka.common.requests.{StreamsGroupDescribeRequest, StreamsGroupDescribeResponse}
|
||||
import org.apache.kafka.common.resource.ResourceType
|
||||
import org.apache.kafka.common.test.ClusterInstance
|
||||
import org.apache.kafka.common.test.api._
|
||||
|
||||
import scala.jdk.CollectionConverters._
|
||||
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
|
||||
import org.apache.kafka.security.authorizer.AclEntry
|
||||
import org.apache.kafka.server.common.Feature
|
||||
import org.junit.Assert.{assertEquals, assertTrue}
|
||||
|
||||
import java.lang.{Byte => JByte}
|
||||
|
||||
@ClusterTestDefaults(
|
||||
types = Array(Type.KRAFT),
|
||||
brokers = 1,
|
||||
serverProperties = Array(
|
||||
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
|
||||
)
|
||||
)
|
||||
class StreamsGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
|
||||
|
||||
@ClusterTest(
|
||||
features = Array(
|
||||
new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
|
||||
)
|
||||
)
|
||||
def testStreamsGroupDescribeWhenFeatureFlagNotEnabled(): Unit = {
|
||||
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
|
||||
new StreamsGroupDescribeRequestData().setGroupIds(List("grp-mock-1", "grp-mock-2").asJava)
|
||||
).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
|
||||
|
||||
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
|
||||
val expectedResponse = new StreamsGroupDescribeResponseData()
|
||||
expectedResponse.groups().add(
|
||||
new StreamsGroupDescribeResponseData.DescribedGroup()
|
||||
.setGroupId("grp-mock-1")
|
||||
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
|
||||
)
|
||||
expectedResponse.groups().add(
|
||||
new StreamsGroupDescribeResponseData.DescribedGroup()
|
||||
.setGroupId("grp-mock-2")
|
||||
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
|
||||
)
|
||||
assertEquals(expectedResponse, streamsGroupDescribeResponse.data)
|
||||
}
|
||||
|
||||
@ClusterTest(
|
||||
serverProperties = Array(
|
||||
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,streams"),
|
||||
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
|
||||
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
|
||||
)
|
||||
)
|
||||
def testStreamsGroupDescribeGroupsWithNewGroupCoordinator(): Unit = {
|
||||
// Creates the __consumer_offsets topics because it won't be created automatically
|
||||
// in this test because it does not use FindCoordinator API.
|
||||
createOffsetsTopic()
|
||||
|
||||
val admin = cluster.admin()
|
||||
val topicName = "foo"
|
||||
|
||||
try {
|
||||
TestUtils.createTopicWithAdminRaw(
|
||||
admin = admin,
|
||||
topic = topicName,
|
||||
numPartitions = 3
|
||||
)
|
||||
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
admin.listTopics().names().get().contains(topicName)
|
||||
}, msg = s"Topic $topicName is not available to the group coordinator")
|
||||
|
||||
val timeoutMs = 5 * 60 * 1000
|
||||
val clientId = "client-id"
|
||||
val clientHost = "/127.0.0.1"
|
||||
val authorizedOperationsInt = Utils.to32BitField(
|
||||
AclEntry.supportedOperations(ResourceType.GROUP).asScala
|
||||
.map(_.code.asInstanceOf[JByte]).asJava)
|
||||
|
||||
var grp1Member1Response: StreamsGroupHeartbeatResponseData = null
|
||||
var grp1Member2Response: StreamsGroupHeartbeatResponseData = null
|
||||
var grp2Member1Response: StreamsGroupHeartbeatResponseData = null
|
||||
var grp2Member2Response: StreamsGroupHeartbeatResponseData = null
|
||||
|
||||
// grp-1 with 2 members
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
grp1Member1Response = streamsGroupHeartbeat(
|
||||
groupId = "grp-1",
|
||||
memberId = "member-1",
|
||||
rebalanceTimeoutMs = timeoutMs,
|
||||
activeTasks = List.empty,
|
||||
standbyTasks = List.empty,
|
||||
warmupTasks = List.empty,
|
||||
topology = new StreamsGroupHeartbeatRequestData.Topology()
|
||||
.setEpoch(1)
|
||||
.setSubtopologies(List(
|
||||
new StreamsGroupHeartbeatRequestData.Subtopology()
|
||||
.setSubtopologyId("subtopology-1")
|
||||
.setSourceTopics(List(topicName).asJava)
|
||||
.setRepartitionSinkTopics(List.empty.asJava)
|
||||
.setRepartitionSourceTopics(List.empty.asJava)
|
||||
.setStateChangelogTopics(List.empty.asJava)
|
||||
).asJava)
|
||||
)
|
||||
grp1Member2Response = streamsGroupHeartbeat(
|
||||
groupId = "grp-1",
|
||||
memberId = "member-2",
|
||||
rebalanceTimeoutMs = timeoutMs,
|
||||
activeTasks = List.empty,
|
||||
standbyTasks = List.empty,
|
||||
warmupTasks = List.empty,
|
||||
topology = new StreamsGroupHeartbeatRequestData.Topology()
|
||||
.setEpoch(1)
|
||||
.setSubtopologies(List(
|
||||
new StreamsGroupHeartbeatRequestData.Subtopology()
|
||||
.setSubtopologyId("subtopology-1")
|
||||
.setSourceTopics(List(topicName).asJava)
|
||||
.setRepartitionSinkTopics(List.empty.asJava)
|
||||
.setRepartitionSourceTopics(List.empty.asJava)
|
||||
.setStateChangelogTopics(List.empty.asJava)
|
||||
).asJava)
|
||||
)
|
||||
|
||||
val groupsDescription1 = streamsGroupDescribe(
|
||||
groupIds = List("grp-1"),
|
||||
includeAuthorizedOperations = true
|
||||
)
|
||||
grp1Member1Response.errorCode == Errors.NONE.code && grp1Member2Response.errorCode == Errors.NONE.code &&
|
||||
groupsDescription1.size == 1 && groupsDescription1.head.members.size == 2
|
||||
}, msg = s"Could not create grp-1 with 2 members successfully")
|
||||
|
||||
// grp-2 with 2 members
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
grp2Member1Response = streamsGroupHeartbeat(
|
||||
groupId = "grp-2",
|
||||
memberId = "member-3",
|
||||
rebalanceTimeoutMs = timeoutMs,
|
||||
activeTasks = List.empty,
|
||||
standbyTasks = List.empty,
|
||||
warmupTasks = List.empty,
|
||||
topology = new StreamsGroupHeartbeatRequestData.Topology()
|
||||
.setEpoch(1)
|
||||
.setSubtopologies(List(
|
||||
new StreamsGroupHeartbeatRequestData.Subtopology()
|
||||
.setSubtopologyId("subtopology-1")
|
||||
.setSourceTopics(List(topicName).asJava)
|
||||
.setRepartitionSinkTopics(List.empty.asJava)
|
||||
.setRepartitionSourceTopics(List.empty.asJava)
|
||||
.setStateChangelogTopics(List.empty.asJava)
|
||||
).asJava)
|
||||
)
|
||||
grp2Member2Response = streamsGroupHeartbeat(
|
||||
groupId = "grp-2",
|
||||
memberId = "member-4",
|
||||
rebalanceTimeoutMs = timeoutMs,
|
||||
activeTasks = List.empty,
|
||||
standbyTasks = List.empty,
|
||||
warmupTasks = List.empty,
|
||||
topology = new StreamsGroupHeartbeatRequestData.Topology()
|
||||
.setEpoch(1)
|
||||
.setSubtopologies(List(
|
||||
new StreamsGroupHeartbeatRequestData.Subtopology()
|
||||
.setSubtopologyId("subtopology-1")
|
||||
.setSourceTopics(List(topicName).asJava)
|
||||
.setRepartitionSinkTopics(List.empty.asJava)
|
||||
.setRepartitionSourceTopics(List.empty.asJava)
|
||||
.setStateChangelogTopics(List.empty.asJava)
|
||||
).asJava)
|
||||
)
|
||||
val groupsDescription2 = streamsGroupDescribe(
|
||||
groupIds = List("grp-2"),
|
||||
includeAuthorizedOperations = true,
|
||||
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
|
||||
)
|
||||
|
||||
grp2Member1Response.errorCode == Errors.NONE.code && grp2Member2Response.errorCode == Errors.NONE.code &&
|
||||
groupsDescription2.size == 1 && groupsDescription2.head.members.size == 2
|
||||
}, msg = s"Could not create grp-2 with 2 members successfully")
|
||||
|
||||
// Send follow-up heartbeats until both groups are stable
|
||||
TestUtils.waitUntilTrue(() => {
|
||||
grp1Member1Response = streamsGroupHeartbeat(
|
||||
groupId = "grp-1",
|
||||
memberId = grp1Member1Response.memberId,
|
||||
memberEpoch = grp1Member1Response.memberEpoch,
|
||||
rebalanceTimeoutMs = timeoutMs,
|
||||
activeTasks = convertTaskIds(grp1Member1Response.activeTasks),
|
||||
standbyTasks = convertTaskIds(grp1Member1Response.standbyTasks),
|
||||
warmupTasks = convertTaskIds(grp1Member1Response.warmupTasks),
|
||||
topology = null
|
||||
)
|
||||
grp1Member2Response = streamsGroupHeartbeat(
|
||||
groupId = "grp-1",
|
||||
memberId = grp1Member2Response.memberId,
|
||||
memberEpoch = grp1Member2Response.memberEpoch,
|
||||
rebalanceTimeoutMs = timeoutMs,
|
||||
activeTasks = convertTaskIds(grp1Member2Response.activeTasks),
|
||||
standbyTasks = convertTaskIds(grp1Member2Response.standbyTasks),
|
||||
warmupTasks = convertTaskIds(grp1Member2Response.warmupTasks),
|
||||
topology = null
|
||||
)
|
||||
grp2Member1Response = streamsGroupHeartbeat(
|
||||
groupId = "grp-2",
|
||||
memberId = grp2Member1Response.memberId,
|
||||
memberEpoch = grp2Member1Response.memberEpoch,
|
||||
rebalanceTimeoutMs = timeoutMs,
|
||||
activeTasks = convertTaskIds(grp2Member1Response.activeTasks),
|
||||
standbyTasks = convertTaskIds(grp2Member1Response.standbyTasks),
|
||||
warmupTasks = convertTaskIds(grp2Member1Response.warmupTasks),
|
||||
topology = null
|
||||
)
|
||||
grp2Member2Response = streamsGroupHeartbeat(
|
||||
groupId = "grp-2",
|
||||
memberId = grp2Member2Response.memberId,
|
||||
memberEpoch = grp2Member2Response.memberEpoch,
|
||||
rebalanceTimeoutMs = timeoutMs,
|
||||
activeTasks = convertTaskIds(grp2Member2Response.activeTasks),
|
||||
standbyTasks = convertTaskIds(grp2Member2Response.standbyTasks),
|
||||
warmupTasks = convertTaskIds(grp2Member2Response.warmupTasks),
|
||||
topology = null
|
||||
)
|
||||
val actual = streamsGroupDescribe(
|
||||
groupIds = List("grp-1","grp-2"),
|
||||
includeAuthorizedOperations = true,
|
||||
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
|
||||
)
|
||||
actual.head.groupState() == "Stable" && actual(1).groupState() == "Stable" &&
|
||||
actual.head.members.size == 2 && actual(1).members.size == 2
|
||||
}, "Two groups did not stabilize with 2 members each in time")
|
||||
|
||||
// Test the describe request for both groups in stable state
|
||||
for (version <- ApiKeys.STREAMS_GROUP_DESCRIBE.oldestVersion() to ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
|
||||
val actual = streamsGroupDescribe(
|
||||
groupIds = List("grp-1","grp-2"),
|
||||
includeAuthorizedOperations = true,
|
||||
version = version.toShort
|
||||
)
|
||||
|
||||
assertEquals(2, actual.size)
|
||||
assertEquals(actual.map(_.groupId).toSet, Set("grp-1", "grp-2"))
|
||||
for (describedGroup <- actual) {
|
||||
assertEquals("Stable", describedGroup.groupState)
|
||||
assertTrue("Group epoch is not equal to the assignment epoch", describedGroup.groupEpoch == describedGroup.assignmentEpoch)
|
||||
// Verify topology
|
||||
assertEquals(1, describedGroup.topology.epoch)
|
||||
assertEquals(1, describedGroup.topology.subtopologies.size)
|
||||
assertEquals("subtopology-1", describedGroup.topology.subtopologies.get(0).subtopologyId)
|
||||
assertEquals(List(topicName).asJava, describedGroup.topology.subtopologies.get(0).sourceTopics)
|
||||
|
||||
// Verify members
|
||||
assertEquals(2, describedGroup.members.size)
|
||||
val expectedMemberIds = describedGroup.groupId match {
|
||||
case "grp-1" => Set(grp1Member1Response.memberId, grp1Member2Response.memberId)
|
||||
case "grp-2" => Set(grp2Member1Response.memberId, grp2Member2Response.memberId)
|
||||
case unexpected => throw new AssertionError(s"Unexpected group ID: $unexpected")
|
||||
}
|
||||
|
||||
val actualMemberIds = describedGroup.members.asScala.map(_.memberId).toSet
|
||||
assertEquals(expectedMemberIds, actualMemberIds)
|
||||
assertEquals(authorizedOperationsInt, describedGroup.authorizedOperations)
|
||||
|
||||
describedGroup.members.asScala.foreach { member =>
|
||||
assertTrue("Group epoch is not equal to the member epoch", member.memberEpoch == describedGroup.assignmentEpoch)
|
||||
assertEquals(1, member.topologyEpoch)
|
||||
assertEquals(member.targetAssignment, member.assignment)
|
||||
assertEquals(clientId, member.clientId())
|
||||
assertEquals(clientHost, member.clientHost())
|
||||
}
|
||||
// Verify all partitions 0, 1, 2 are assigned exactly once
|
||||
val allAssignedPartitions = describedGroup.members.asScala.flatMap { member =>
|
||||
member.assignment.activeTasks.asScala.flatMap(_.partitions.asScala)
|
||||
}.toList
|
||||
assertEquals(List(0, 1, 2).sorted, allAssignedPartitions.sorted)
|
||||
}
|
||||
}
|
||||
} finally{
|
||||
admin.close()
|
||||
}
|
||||
}
|
||||
|
||||
private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): List[StreamsGroupHeartbeatRequestData.TaskIds] = {
|
||||
if (responseTasks == null) {
|
||||
List.empty
|
||||
} else {
|
||||
responseTasks.asScala.map { responseTask =>
|
||||
new StreamsGroupHeartbeatRequestData.TaskIds()
|
||||
.setSubtopologyId(responseTask.subtopologyId)
|
||||
.setPartitions(responseTask.partitions)
|
||||
}.toList
|
||||
}
|
||||
}
|
||||
}
|
|
@ -281,6 +281,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
|
|||
streamsApplicationProperties = props(groupProtocol);
|
||||
final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology();
|
||||
|
||||
shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT);
|
||||
shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT);
|
||||
}
|
||||
|
||||
private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception {
|
||||
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
|
||||
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
|
||||
|
||||
|
@ -292,8 +297,8 @@ public class KafkaStreamsTelemetryIntegrationTest {
|
|||
|
||||
|
||||
|
||||
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList();
|
||||
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
|
||||
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList();
|
||||
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList();
|
||||
|
||||
|
||||
assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size());
|
||||
|
|
|
@ -71,6 +71,17 @@ public interface StateStore {
|
|||
*/
|
||||
void init(final StateStoreContext stateStoreContext, final StateStore root);
|
||||
|
||||
|
||||
/**
|
||||
* Assigns the store to a stream thread.
|
||||
* <p>
|
||||
* This function is called from the final stream thread,
|
||||
* thus can be used to initialize resources that might require to know the running thread, e.g. metrics.
|
||||
* </p>
|
||||
* To access the thread use {@link Thread#currentThread()}
|
||||
*/
|
||||
default void assignThread() { }
|
||||
|
||||
/**
|
||||
* Flush any cached data
|
||||
*/
|
||||
|
|
|
@ -48,6 +48,11 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
|
|||
throw new UnsupportedOperationException(ERROR_MESSAGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
throw new UnsupportedOperationException(ERROR_MESSAGE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
throw new UnsupportedOperationException(ERROR_MESSAGE);
|
||||
|
|
|
@ -78,6 +78,11 @@ public class ReadOnlyTask implements Task {
|
|||
throw new UnsupportedOperationException("This task is read-only");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
throw new UnsupportedOperationException("This task is read-only");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
|
||||
throw new UnsupportedOperationException("This task is read-only");
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig;
|
|||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.errors.TaskCorruptedException;
|
||||
import org.apache.kafka.streams.errors.TaskMigratedException;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
|
||||
|
@ -45,8 +46,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
|
|||
*/
|
||||
public class StandbyTask extends AbstractTask implements Task {
|
||||
private final boolean eosEnabled;
|
||||
private final Sensor closeTaskSensor;
|
||||
private final Sensor updateSensor;
|
||||
private Sensor closeTaskSensor;
|
||||
private Sensor updateSensor;
|
||||
private final StreamsMetricsImpl streamsMetrics;
|
||||
|
||||
protected final InternalProcessorContext<?, ?> processorContext;
|
||||
|
@ -83,8 +84,6 @@ public class StandbyTask extends AbstractTask implements Task {
|
|||
this.streamsMetrics = streamsMetrics;
|
||||
processorContext.transitionToStandby(cache);
|
||||
|
||||
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
|
||||
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
|
||||
this.eosEnabled = config.eosEnabled;
|
||||
}
|
||||
|
||||
|
@ -129,6 +128,15 @@ public class StandbyTask extends AbstractTask implements Task {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
|
||||
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
|
||||
for (final StateStore stateStore : topology.stateStores()) {
|
||||
stateStore.assignThread();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
|
||||
throw new IllegalStateException("Standby task " + id + " should never be completing restoration");
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.internals.FailedProcessingException;
|
|||
import org.apache.kafka.streams.processor.Cancellable;
|
||||
import org.apache.kafka.streams.processor.PunctuationType;
|
||||
import org.apache.kafka.streams.processor.Punctuator;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
|
@ -278,6 +279,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
for (final StateStore stateStore : topology.stateStores()) {
|
||||
stateStore.assignThread();
|
||||
}
|
||||
}
|
||||
|
||||
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
|
||||
mainConsumer.pause(partitionsForOffsetReset);
|
||||
resetOffsetsForPartitions.addAll(partitionsForOffsetReset);
|
||||
|
|
|
@ -110,6 +110,8 @@ public interface Task {
|
|||
*/
|
||||
void initializeIfNeeded();
|
||||
|
||||
void assignThread();
|
||||
|
||||
default void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
|
|
@ -343,6 +343,8 @@ public class TaskManager {
|
|||
final TaskId taskId = entry.getKey();
|
||||
final Task task = stateDirectory.removeStartupTask(taskId);
|
||||
if (task != null) {
|
||||
task.assignThread();
|
||||
|
||||
// replace our dummy values with the real ones, now we know our thread and assignment
|
||||
final Set<TopicPartition> inputPartitions = entry.getValue();
|
||||
task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions);
|
||||
|
@ -928,6 +930,7 @@ public class TaskManager {
|
|||
for (final Task task : tasks.allTasks()) {
|
||||
try {
|
||||
task.initializeIfNeeded();
|
||||
task.assignThread();
|
||||
task.clearTaskTimeout();
|
||||
} catch (final LockException lockException) {
|
||||
// it is possible that if there are multiple threads within the instance that one thread
|
||||
|
@ -1082,6 +1085,7 @@ public class TaskManager {
|
|||
try {
|
||||
if (canTryInitializeTask(task.id(), nowMs)) {
|
||||
task.initializeIfNeeded();
|
||||
task.assignThread();
|
||||
taskIdToBackoffRecord.remove(task.id());
|
||||
stateUpdater.add(task);
|
||||
} else {
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.kafka.streams.processor.StateStoreContext;
|
|||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
|
||||
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
|
@ -243,16 +242,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
|
|||
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
|
||||
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
|
||||
|
||||
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
|
||||
final String threadId = Thread.currentThread().getName();
|
||||
final String taskName = stateStoreContext.taskId().toString();
|
||||
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
threadId,
|
||||
taskName,
|
||||
metrics
|
||||
);
|
||||
|
||||
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
|
||||
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
|
||||
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
|
||||
|
@ -276,6 +265,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
Thread.currentThread().getName(),
|
||||
internalProcessorContext.taskId().toString(),
|
||||
ProcessorContextUtils.metricsImpl(internalProcessorContext)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
segments.flush();
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
|
|||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
|
||||
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.state.KeyValueIterator;
|
||||
|
@ -294,16 +293,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
|
|||
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
|
||||
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
|
||||
|
||||
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
|
||||
final String threadId = Thread.currentThread().getName();
|
||||
final String taskName = stateStoreContext.taskId().toString();
|
||||
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
threadId,
|
||||
taskName,
|
||||
metrics
|
||||
);
|
||||
|
||||
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
|
||||
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
|
||||
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
|
||||
|
@ -325,6 +314,15 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
|
|||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
Thread.currentThread().getName(),
|
||||
internalProcessorContext.taskId().toString(),
|
||||
ProcessorContextUtils.metricsImpl(internalProcessorContext)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
segments.flush();
|
||||
|
@ -404,4 +402,4 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
|
|||
public Position getPosition() {
|
||||
return position;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,6 +104,11 @@ public class CachingKeyValueStore
|
|||
}
|
||||
});
|
||||
super.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
super.assignThread();
|
||||
// save the stream thread as we only ever want to trigger a flush
|
||||
// when the stream thread is the current thread.
|
||||
streamThread = Thread.currentThread();
|
||||
|
|
|
@ -25,10 +25,10 @@ import org.apache.kafka.streams.kstream.Windowed;
|
|||
import org.apache.kafka.streams.kstream.internals.SessionWindow;
|
||||
import org.apache.kafka.streams.processor.StateStore;
|
||||
import org.apache.kafka.streams.processor.StateStoreContext;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
|
||||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.query.PositionBound;
|
||||
|
@ -78,6 +78,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|||
|
||||
private StateStoreContext stateStoreContext;
|
||||
private final Position position;
|
||||
private TaskId taskId;
|
||||
|
||||
InMemorySessionStore(final String name,
|
||||
final long retentionPeriod,
|
||||
|
@ -97,22 +98,14 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|||
public void init(final StateStoreContext stateStoreContext,
|
||||
final StateStore root) {
|
||||
this.stateStoreContext = stateStoreContext;
|
||||
final String threadId = Thread.currentThread().getName();
|
||||
final String taskName = stateStoreContext.taskId().toString();
|
||||
taskId = stateStoreContext.taskId();
|
||||
|
||||
// The provided context is not required to implement InternalProcessorContext,
|
||||
// If it doesn't, we can't record this metric.
|
||||
if (stateStoreContext instanceof InternalProcessorContext) {
|
||||
this.context = (InternalProcessorContext<?, ?>) stateStoreContext;
|
||||
final StreamsMetricsImpl metrics = this.context.metrics();
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
threadId,
|
||||
taskName,
|
||||
metrics
|
||||
);
|
||||
} else {
|
||||
this.context = null;
|
||||
expiredRecordSensor = null;
|
||||
}
|
||||
|
||||
if (root != null) {
|
||||
|
@ -140,6 +133,19 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
|
|||
open = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
if (context != null) {
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
Thread.currentThread().getName(),
|
||||
taskId.toString(),
|
||||
this.context.metrics()
|
||||
);
|
||||
} else {
|
||||
expiredRecordSensor = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Position getPosition() {
|
||||
return position;
|
||||
|
|
|
@ -202,6 +202,14 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
|
|||
taskId = context.taskId().toString();
|
||||
streamsMetrics = context.metrics();
|
||||
|
||||
this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
|
||||
updateBufferMetrics();
|
||||
open = true;
|
||||
partition = context.taskId().partition();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor(
|
||||
taskId,
|
||||
METRIC_SCOPE,
|
||||
|
@ -214,11 +222,6 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
|
|||
storeName,
|
||||
streamsMetrics
|
||||
);
|
||||
|
||||
this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
|
||||
updateBufferMetrics();
|
||||
open = true;
|
||||
partition = context.taskId().partition();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
|
|||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
|
||||
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.query.PositionBound;
|
||||
|
@ -104,15 +103,6 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
|
|||
final StateStore root) {
|
||||
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
|
||||
|
||||
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
|
||||
final String threadId = Thread.currentThread().getName();
|
||||
final String taskName = stateStoreContext.taskId().toString();
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
threadId,
|
||||
taskName,
|
||||
metrics
|
||||
);
|
||||
|
||||
if (root != null) {
|
||||
final boolean consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
|
||||
stateStoreContext.appConfigs(),
|
||||
|
@ -142,6 +132,15 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
|
|||
open = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
Thread.currentThread().getName(),
|
||||
internalProcessorContext.taskId().toString(),
|
||||
ProcessorContextUtils.metricsImpl(internalProcessorContext)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Position getPosition() {
|
||||
return position;
|
||||
|
|
|
@ -65,6 +65,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
|
|||
@Override
|
||||
public void openExisting(final StateStoreContext context, final long streamTime) {
|
||||
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
|
||||
metricsRecorder.assignThread();
|
||||
super.openExisting(context, streamTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,6 +126,11 @@ public class KeyValueStoreWrapper<K, V> implements StateStore {
|
|||
store.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
store.assignThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
store.flush();
|
||||
|
|
|
@ -99,6 +99,11 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
|
|||
store.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
store.assignThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
store.flush();
|
||||
|
@ -235,4 +240,4 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
|
|||
return KeyValue.pair(next.key, convertToTimestampedFormat(next.value));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -141,6 +141,11 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
|
|||
throw new UnsupportedOperationException("cannot initialize a logical segment");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
throw new UnsupportedOperationException("nothing to reassign");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
throw new UnsupportedOperationException("nothing to flush for logical segment");
|
||||
|
@ -368,4 +373,4 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
|
|||
private static byte[] serializeLongToBytes(final long l) {
|
||||
return ByteBuffer.allocate(Long.BYTES).putLong(l).array();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,6 +105,7 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
|
|||
@Override
|
||||
public void openExisting(final StateStoreContext context, final long streamTime) {
|
||||
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
|
||||
metricsRecorder.assignThread();
|
||||
physicalStore.openDB(context.appConfigs(), context.stateDir());
|
||||
}
|
||||
|
||||
|
@ -142,4 +143,4 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
|
|||
|
||||
return super.segmentName(segmentId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,13 +128,15 @@ public class MeteredKeyValueStore<K, V>
|
|||
initStoreSerde(stateStoreContext);
|
||||
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
|
||||
|
||||
registerMetrics();
|
||||
|
||||
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||
|
||||
super.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
registerMetrics();
|
||||
super.assignThread();
|
||||
}
|
||||
|
||||
private void registerMetrics() {
|
||||
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||
putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||
|
@ -150,6 +152,7 @@ public class MeteredKeyValueStore<K, V>
|
|||
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
|
||||
(config, now) -> openIterators.sum());
|
||||
openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics);
|
||||
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -108,12 +108,15 @@ public class MeteredSessionStore<K, V>
|
|||
initStoreSerde(stateStoreContext);
|
||||
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
|
||||
|
||||
registerMetrics();
|
||||
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||
|
||||
super.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
registerMetrics();
|
||||
super.assignThread();
|
||||
}
|
||||
|
||||
private void registerMetrics() {
|
||||
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||
|
@ -129,6 +132,8 @@ public class MeteredSessionStore<K, V>
|
|||
return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null;
|
||||
}
|
||||
);
|
||||
|
||||
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -179,6 +179,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
metricsRecorder.assignThread();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
void openDB(final Map<String, Object> configs, final File stateDir) {
|
||||
// initialize the default rocksdb options
|
||||
|
@ -1016,4 +1021,4 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
|
|||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
|
|||
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
|
||||
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
|
||||
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
|
||||
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
|
||||
import org.apache.kafka.streams.query.Position;
|
||||
import org.apache.kafka.streams.query.PositionBound;
|
||||
|
@ -352,16 +351,6 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
|
||||
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
|
||||
|
||||
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
|
||||
final String threadId = Thread.currentThread().getName();
|
||||
final String taskName = stateStoreContext.taskId().toString();
|
||||
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
threadId,
|
||||
taskName,
|
||||
metrics
|
||||
);
|
||||
|
||||
metricsRecorder.init(ProcessorContextUtils.metricsImpl(stateStoreContext), stateStoreContext.taskId());
|
||||
|
||||
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
|
||||
|
@ -386,6 +375,16 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
|
||||
Thread.currentThread().getName(),
|
||||
internalProcessorContext.taskId().toString(),
|
||||
ProcessorContextUtils.metricsImpl(internalProcessorContext)
|
||||
);
|
||||
metricsRecorder.assignThread();
|
||||
}
|
||||
|
||||
// VisibleForTesting
|
||||
void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
|
||||
|
||||
|
@ -1012,4 +1011,4 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
|
|||
private static String latestValueStoreName(final String storeName) {
|
||||
return storeName + ".latestValues";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
|
|||
@Override
|
||||
public void openExisting(final StateStoreContext context, final long streamTime) {
|
||||
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
|
||||
metricsRecorder.assignThread();
|
||||
super.openExisting(context, streamTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,6 +92,11 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore
|
|||
inner.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
inner.assignThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
inner.flush();
|
||||
|
@ -180,4 +185,4 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore
|
|||
null,
|
||||
ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -160,6 +160,11 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
|
|||
store.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
store.assignThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
store.flush();
|
||||
|
@ -212,4 +217,4 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
|
|||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,6 +63,11 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
|
|||
wrapped.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
wrapped.assignThread();
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public boolean setFlushListener(final CacheFlushListener<K, V> listener,
|
||||
|
|
|
@ -146,11 +146,14 @@ public class RocksDBMetricsRecorder {
|
|||
+ "This is a bug in Kafka Streams. " +
|
||||
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
|
||||
}
|
||||
this.taskId = taskId;
|
||||
this.streamsMetrics = streamsMetrics;
|
||||
}
|
||||
|
||||
public void assignThread() {
|
||||
final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName);
|
||||
initSensors(streamsMetrics, metricContext);
|
||||
initGauges(streamsMetrics, metricContext);
|
||||
this.taskId = taskId;
|
||||
this.streamsMetrics = streamsMetrics;
|
||||
}
|
||||
|
||||
public void addValueProviders(final String segmentName,
|
||||
|
@ -510,4 +513,4 @@ public class RocksDBMetricsRecorder {
|
|||
}
|
||||
return (double) sum / count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4898,6 +4898,10 @@ public class TaskManagerTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
|
||||
this.partitionsForOffsetReset = partitionsForOffsetReset;
|
||||
|
|
|
@ -1213,6 +1213,11 @@ public class TopologyTestDriver implements Closeable {
|
|||
inner.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
inner.assignThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(final K key, final V value) {
|
||||
inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
|
||||
|
@ -1277,6 +1282,11 @@ public class TopologyTestDriver implements Closeable {
|
|||
inner.init(stateStoreContext, root);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void assignThread() {
|
||||
inner.assignThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(final K key,
|
||||
final V value,
|
||||
|
|
Loading…
Reference in New Issue