Compare commits

...

9 Commits

Author SHA1 Message Date
Nikita Shupletsov 45505f4719
Merge 121e934f7b into 2938c4242e 2025-10-07 09:14:56 -07:00
lucliu1108 2938c4242e
KAFKA-19754: Add RPC-level integration test for StreamsGroupDescribeRequest (#20632)
CI / build (push) Waiting to run Details
Test the `StreamsGroupDescribeRequest` RPC and corresponding responses
for situations where
- `streams.version` not upgraded to 1
- `streams.version` enabled, multiple groups listening to the same
topic.

Reviewers: Lucas Brutschy <lucasbru@apache.org>
2025-10-07 15:47:32 +02:00
Nikita 121e934f7b
Made assignThread default with an empty implementation.
Removed empty overrides of the method.
2025-10-03 11:19:11 -07:00
Nikita bfda2319c6
Merge branch 'trunk' into KAFKA-19434
Conflicts:
	streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
	streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
2025-10-03 10:19:07 -07:00
Nikita b1c4108aa4
Small refactoring to address comments. 2025-09-26 14:37:38 -07:00
Nikita 9566c6429e
Renamed the method to better reflect it's purpose.
Moved thread assignment from init to assignThread in CachingKeyValueStore.
Moved restoreSensor creation to assignThread. it didn't work as is, the behavior will not be that much different.
2025-09-26 14:14:36 -07:00
Nikita 576d072707
Extracted metric creation to a separate method to separate task initialization from metric initialization. 2025-09-25 14:58:30 -07:00
Nikita 19a7864f62
Passed clientInstance to avoid cleaning the consumers.
Marked the parameters final.
2025-09-24 09:28:31 -07:00
Nikita 23476deddc
KAFKA-19434: moved the metric initialization to task assignment to have the correct thread. 2025-09-22 17:25:02 -07:00
32 changed files with 570 additions and 85 deletions

View File

@ -26,9 +26,9 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment
import org.apache.kafka.common.message.WriteTxnMarkersRequestData.{WritableTxnMarker, WritableTxnMarkerTopic}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.message.{AddOffsetsToTxnRequestData, AddOffsetsToTxnResponseData, ConsumerGroupDescribeRequestData, ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsRequestData, DeleteGroupsResponseData, DescribeGroupsRequestData, DescribeGroupsResponseData, EndTxnRequestData, HeartbeatRequestData, HeartbeatResponseData, InitProducerIdRequestData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeRequestData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData, WriteTxnMarkersRequestData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, DeleteGroupsRequest, DeleteGroupsResponse, DescribeGroupsRequest, DescribeGroupsResponse, EndTxnRequest, EndTxnResponse, HeartbeatRequest, HeartbeatResponse, InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsRequest, ListGroupsResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, ShareGroupDescribeRequest, ShareGroupDescribeResponse, ShareGroupHeartbeatRequest, ShareGroupHeartbeatResponse, StreamsGroupDescribeRequest, StreamsGroupDescribeResponse, StreamsGroupHeartbeatRequest, StreamsGroupHeartbeatResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.utils.ProducerIdAndEpoch
@ -768,6 +768,21 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
shareGroupDescribeResponse.data.groups.asScala.toList
}
protected def streamsGroupDescribe(
groupIds: List[String],
includeAuthorizedOperations: Boolean = false,
version: Short = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)
): List[StreamsGroupDescribeResponseData.DescribedGroup] = {
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData()
.setGroupIds(groupIds.asJava)
.setIncludeAuthorizedOperations(includeAuthorizedOperations)
).build(version)
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
streamsGroupDescribeResponse.data.groups.asScala.toList
}
protected def heartbeat(
groupId: String,
generationId: Int,
@ -855,6 +870,41 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
shareGroupHeartbeatResponse.data
}
protected def streamsGroupHeartbeat(
groupId: String,
memberId: String = "",
memberEpoch: Int = 0,
rebalanceTimeoutMs: Int = -1,
activeTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
standbyTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
warmupTasks: List[StreamsGroupHeartbeatRequestData.TaskIds] = null,
topology: StreamsGroupHeartbeatRequestData.Topology = null,
expectedError: Errors = Errors.NONE,
version: Short = ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled)
): StreamsGroupHeartbeatResponseData = {
val streamsGroupHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder(
new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
.setMemberEpoch(memberEpoch)
.setRebalanceTimeoutMs(rebalanceTimeoutMs)
.setActiveTasks(activeTasks.asJava)
.setStandbyTasks(standbyTasks.asJava)
.setWarmupTasks(warmupTasks.asJava)
.setTopology(topology)
).build(version)
// Send the request until receiving a successful response. There is a delay
// here because the group coordinator is loaded in the background.
var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse = connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
streamsGroupHeartbeatResponse.data.errorCode == expectedError.code
}, msg = s"Could not heartbeat successfully. Last response $streamsGroupHeartbeatResponse.")
streamsGroupHeartbeatResponse.data
}
protected def leaveGroupWithNewProtocol(
groupId: String,
memberId: String

View File

@ -0,0 +1,316 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import kafka.utils.TestUtils
import org.apache.kafka.common.message.{StreamsGroupDescribeRequestData, StreamsGroupDescribeResponseData, StreamsGroupHeartbeatRequestData, StreamsGroupHeartbeatResponseData}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{StreamsGroupDescribeRequest, StreamsGroupDescribeResponse}
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.common.test.api._
import scala.jdk.CollectionConverters._
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.common.Feature
import org.junit.Assert.{assertEquals, assertTrue}
import java.lang.{Byte => JByte}
@ClusterTestDefaults(
types = Array(Type.KRAFT),
brokers = 1,
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
class StreamsGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@ClusterTest(
features = Array(
new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
)
)
def testStreamsGroupDescribeWhenFeatureFlagNotEnabled(): Unit = {
val streamsGroupDescribeRequest = new StreamsGroupDescribeRequest.Builder(
new StreamsGroupDescribeRequestData().setGroupIds(List("grp-mock-1", "grp-mock-2").asJava)
).build(ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled))
val streamsGroupDescribeResponse = connectAndReceive[StreamsGroupDescribeResponse](streamsGroupDescribeRequest)
val expectedResponse = new StreamsGroupDescribeResponseData()
expectedResponse.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-mock-1")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
expectedResponse.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp-mock-2")
.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
)
assertEquals(expectedResponse, streamsGroupDescribeResponse.data)
}
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer,streams"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
)
)
def testStreamsGroupDescribeGroupsWithNewGroupCoordinator(): Unit = {
// Creates the __consumer_offsets topics because it won't be created automatically
// in this test because it does not use FindCoordinator API.
createOffsetsTopic()
val admin = cluster.admin()
val topicName = "foo"
try {
TestUtils.createTopicWithAdminRaw(
admin = admin,
topic = topicName,
numPartitions = 3
)
TestUtils.waitUntilTrue(() => {
admin.listTopics().names().get().contains(topicName)
}, msg = s"Topic $topicName is not available to the group coordinator")
val timeoutMs = 5 * 60 * 1000
val clientId = "client-id"
val clientHost = "/127.0.0.1"
val authorizedOperationsInt = Utils.to32BitField(
AclEntry.supportedOperations(ResourceType.GROUP).asScala
.map(_.code.asInstanceOf[JByte]).asJava)
var grp1Member1Response: StreamsGroupHeartbeatResponseData = null
var grp1Member2Response: StreamsGroupHeartbeatResponseData = null
var grp2Member1Response: StreamsGroupHeartbeatResponseData = null
var grp2Member2Response: StreamsGroupHeartbeatResponseData = null
// grp-1 with 2 members
TestUtils.waitUntilTrue(() => {
grp1Member1Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = "member-1",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
grp1Member2Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = "member-2",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
val groupsDescription1 = streamsGroupDescribe(
groupIds = List("grp-1"),
includeAuthorizedOperations = true
)
grp1Member1Response.errorCode == Errors.NONE.code && grp1Member2Response.errorCode == Errors.NONE.code &&
groupsDescription1.size == 1 && groupsDescription1.head.members.size == 2
}, msg = s"Could not create grp-1 with 2 members successfully")
// grp-2 with 2 members
TestUtils.waitUntilTrue(() => {
grp2Member1Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = "member-3",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
grp2Member2Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = "member-4",
rebalanceTimeoutMs = timeoutMs,
activeTasks = List.empty,
standbyTasks = List.empty,
warmupTasks = List.empty,
topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(List(
new StreamsGroupHeartbeatRequestData.Subtopology()
.setSubtopologyId("subtopology-1")
.setSourceTopics(List(topicName).asJava)
.setRepartitionSinkTopics(List.empty.asJava)
.setRepartitionSourceTopics(List.empty.asJava)
.setStateChangelogTopics(List.empty.asJava)
).asJava)
)
val groupsDescription2 = streamsGroupDescribe(
groupIds = List("grp-2"),
includeAuthorizedOperations = true,
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
)
grp2Member1Response.errorCode == Errors.NONE.code && grp2Member2Response.errorCode == Errors.NONE.code &&
groupsDescription2.size == 1 && groupsDescription2.head.members.size == 2
}, msg = s"Could not create grp-2 with 2 members successfully")
// Send follow-up heartbeats until both groups are stable
TestUtils.waitUntilTrue(() => {
grp1Member1Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = grp1Member1Response.memberId,
memberEpoch = grp1Member1Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp1Member1Response.activeTasks),
standbyTasks = convertTaskIds(grp1Member1Response.standbyTasks),
warmupTasks = convertTaskIds(grp1Member1Response.warmupTasks),
topology = null
)
grp1Member2Response = streamsGroupHeartbeat(
groupId = "grp-1",
memberId = grp1Member2Response.memberId,
memberEpoch = grp1Member2Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp1Member2Response.activeTasks),
standbyTasks = convertTaskIds(grp1Member2Response.standbyTasks),
warmupTasks = convertTaskIds(grp1Member2Response.warmupTasks),
topology = null
)
grp2Member1Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = grp2Member1Response.memberId,
memberEpoch = grp2Member1Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp2Member1Response.activeTasks),
standbyTasks = convertTaskIds(grp2Member1Response.standbyTasks),
warmupTasks = convertTaskIds(grp2Member1Response.warmupTasks),
topology = null
)
grp2Member2Response = streamsGroupHeartbeat(
groupId = "grp-2",
memberId = grp2Member2Response.memberId,
memberEpoch = grp2Member2Response.memberEpoch,
rebalanceTimeoutMs = timeoutMs,
activeTasks = convertTaskIds(grp2Member2Response.activeTasks),
standbyTasks = convertTaskIds(grp2Member2Response.standbyTasks),
warmupTasks = convertTaskIds(grp2Member2Response.warmupTasks),
topology = null
)
val actual = streamsGroupDescribe(
groupIds = List("grp-1","grp-2"),
includeAuthorizedOperations = true,
version = ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled).toShort
)
actual.head.groupState() == "Stable" && actual(1).groupState() == "Stable" &&
actual.head.members.size == 2 && actual(1).members.size == 2
}, "Two groups did not stabilize with 2 members each in time")
// Test the describe request for both groups in stable state
for (version <- ApiKeys.STREAMS_GROUP_DESCRIBE.oldestVersion() to ApiKeys.STREAMS_GROUP_DESCRIBE.latestVersion(isUnstableApiEnabled)) {
val actual = streamsGroupDescribe(
groupIds = List("grp-1","grp-2"),
includeAuthorizedOperations = true,
version = version.toShort
)
assertEquals(2, actual.size)
assertEquals(actual.map(_.groupId).toSet, Set("grp-1", "grp-2"))
for (describedGroup <- actual) {
assertEquals("Stable", describedGroup.groupState)
assertTrue("Group epoch is not equal to the assignment epoch", describedGroup.groupEpoch == describedGroup.assignmentEpoch)
// Verify topology
assertEquals(1, describedGroup.topology.epoch)
assertEquals(1, describedGroup.topology.subtopologies.size)
assertEquals("subtopology-1", describedGroup.topology.subtopologies.get(0).subtopologyId)
assertEquals(List(topicName).asJava, describedGroup.topology.subtopologies.get(0).sourceTopics)
// Verify members
assertEquals(2, describedGroup.members.size)
val expectedMemberIds = describedGroup.groupId match {
case "grp-1" => Set(grp1Member1Response.memberId, grp1Member2Response.memberId)
case "grp-2" => Set(grp2Member1Response.memberId, grp2Member2Response.memberId)
case unexpected => throw new AssertionError(s"Unexpected group ID: $unexpected")
}
val actualMemberIds = describedGroup.members.asScala.map(_.memberId).toSet
assertEquals(expectedMemberIds, actualMemberIds)
assertEquals(authorizedOperationsInt, describedGroup.authorizedOperations)
describedGroup.members.asScala.foreach { member =>
assertTrue("Group epoch is not equal to the member epoch", member.memberEpoch == describedGroup.assignmentEpoch)
assertEquals(1, member.topologyEpoch)
assertEquals(member.targetAssignment, member.assignment)
assertEquals(clientId, member.clientId())
assertEquals(clientHost, member.clientHost())
}
// Verify all partitions 0, 1, 2 are assigned exactly once
val allAssignedPartitions = describedGroup.members.asScala.flatMap { member =>
member.assignment.activeTasks.asScala.flatMap(_.partitions.asScala)
}.toList
assertEquals(List(0, 1, 2).sorted, allAssignedPartitions.sorted)
}
}
} finally{
admin.close()
}
}
private def convertTaskIds(responseTasks: java.util.List[StreamsGroupHeartbeatResponseData.TaskIds]): List[StreamsGroupHeartbeatRequestData.TaskIds] = {
if (responseTasks == null) {
List.empty
} else {
responseTasks.asScala.map { responseTask =>
new StreamsGroupHeartbeatRequestData.TaskIds()
.setSubtopologyId(responseTask.subtopologyId)
.setPartitions(responseTask.partitions)
}.toList
}
}
}

View File

@ -281,6 +281,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
streamsApplicationProperties = props(groupProtocol);
final Topology topology = topologyType.equals("simple") ? simpleTopology(false) : complexTopology();
shouldPassMetrics(topology, FIRST_INSTANCE_CLIENT);
shouldPassMetrics(topology, SECOND_INSTANCE_CLIENT);
}
private void shouldPassMetrics(final Topology topology, final int clientInstance) throws Exception {
try (final KafkaStreams streams = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
@ -292,8 +297,8 @@ public class KafkaStreamsTelemetryIntegrationTest {
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics().stream().map(KafkaMetric::metricName).toList();
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
final List<MetricName> consumerPassedStreamThreadMetricNames = INTERCEPTING_CONSUMERS.get(clientInstance).passedMetrics().stream().map(KafkaMetric::metricName).toList();
final List<MetricName> adminPassedStreamClientMetricNames = INTERCEPTING_ADMIN_CLIENTS.get(clientInstance).passedMetrics.stream().map(KafkaMetric::metricName).toList();
assertEquals(streamsThreadMetrics.size(), consumerPassedStreamThreadMetricNames.size());

View File

@ -71,6 +71,17 @@ public interface StateStore {
*/
void init(final StateStoreContext stateStoreContext, final StateStore root);
/**
* Assigns the store to a stream thread.
* <p>
* This function is called from the final stream thread,
* thus can be used to initialize resources that might require to know the running thread, e.g. metrics.
* </p>
* To access the thread use {@link Thread#currentThread()}
*/
default void assignThread() { }
/**
* Flush any cached data
*/

View File

@ -48,6 +48,11 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public void assignThread() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}
@Override
public void close() {
throw new UnsupportedOperationException(ERROR_MESSAGE);

View File

@ -78,6 +78,11 @@ public class ReadOnlyTask implements Task {
throw new UnsupportedOperationException("This task is read-only");
}
@Override
public void assignThread() {
throw new UnsupportedOperationException("This task is read-only");
}
@Override
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
throw new UnsupportedOperationException("This task is read-only");

View File

@ -27,6 +27,7 @@ import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
@ -45,8 +46,8 @@ import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
*/
public class StandbyTask extends AbstractTask implements Task {
private final boolean eosEnabled;
private final Sensor closeTaskSensor;
private final Sensor updateSensor;
private Sensor closeTaskSensor;
private Sensor updateSensor;
private final StreamsMetricsImpl streamsMetrics;
protected final InternalProcessorContext<?, ?> processorContext;
@ -83,8 +84,6 @@ public class StandbyTask extends AbstractTask implements Task {
this.streamsMetrics = streamsMetrics;
processorContext.transitionToStandby(cache);
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
this.eosEnabled = config.eosEnabled;
}
@ -129,6 +128,15 @@ public class StandbyTask extends AbstractTask implements Task {
}
}
@Override
public void assignThread() {
closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics);
updateSensor = TaskMetrics.updateSensor(Thread.currentThread().getName(), id.toString(), streamsMetrics);
for (final StateStore stateStore : topology.stateStores()) {
stateStore.assignThread();
}
}
@Override
public void completeRestoration(final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
throw new IllegalStateException("Standby task " + id + " should never be completing restoration");

View File

@ -42,6 +42,7 @@ import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record;
@ -278,6 +279,13 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
}
@Override
public void assignThread() {
for (final StateStore stateStore : topology.stateStores()) {
stateStore.assignThread();
}
}
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
mainConsumer.pause(partitionsForOffsetReset);
resetOffsetsForPartitions.addAll(partitionsForOffsetReset);

View File

@ -110,6 +110,8 @@ public interface Task {
*/
void initializeIfNeeded();
void assignThread();
default void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
throw new UnsupportedOperationException();
}

View File

@ -343,6 +343,8 @@ public class TaskManager {
final TaskId taskId = entry.getKey();
final Task task = stateDirectory.removeStartupTask(taskId);
if (task != null) {
task.assignThread();
// replace our dummy values with the real ones, now we know our thread and assignment
final Set<TopicPartition> inputPartitions = entry.getValue();
task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions);
@ -928,6 +930,7 @@ public class TaskManager {
for (final Task task : tasks.allTasks()) {
try {
task.initializeIfNeeded();
task.assignThread();
task.clearTaskTimeout();
} catch (final LockException lockException) {
// it is possible that if there are multiple threads within the instance that one thread
@ -1082,6 +1085,7 @@ public class TaskManager {
try {
if (canTryInitializeTask(task.id(), nowMs)) {
task.initializeIfNeeded();
task.assignThread();
taskIdToBackoffRecord.remove(task.id());
stateUpdater.add(task);
} else {

View File

@ -27,7 +27,6 @@ import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -243,16 +242,6 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
@ -276,6 +265,15 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
);
}
@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}
@Override
public void flush() {
segments.flush();

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -294,16 +293,6 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
@ -325,6 +314,15 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
false);
}
@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}
@Override
public void flush() {
segments.flush();
@ -404,4 +402,4 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
public Position getPosition() {
return position;
}
}
}

View File

@ -104,6 +104,11 @@ public class CachingKeyValueStore
}
});
super.init(stateStoreContext, root);
}
@Override
public void assignThread() {
super.assignThread();
// save the stream thread as we only ever want to trigger a flush
// when the stream thread is the current thread.
streamThread = Thread.currentThread();

View File

@ -25,10 +25,10 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
@ -78,6 +78,7 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
private StateStoreContext stateStoreContext;
private final Position position;
private TaskId taskId;
InMemorySessionStore(final String name,
final long retentionPeriod,
@ -97,22 +98,14 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
public void init(final StateStoreContext stateStoreContext,
final StateStore root) {
this.stateStoreContext = stateStoreContext;
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
taskId = stateStoreContext.taskId();
// The provided context is not required to implement InternalProcessorContext,
// If it doesn't, we can't record this metric.
if (stateStoreContext instanceof InternalProcessorContext) {
this.context = (InternalProcessorContext<?, ?>) stateStoreContext;
final StreamsMetricsImpl metrics = this.context.metrics();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
} else {
this.context = null;
expiredRecordSensor = null;
}
if (root != null) {
@ -140,6 +133,19 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
open = true;
}
@Override
public void assignThread() {
if (context != null) {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
taskId.toString(),
this.context.metrics()
);
} else {
expiredRecordSensor = null;
}
}
@Override
public Position getPosition() {
return position;

View File

@ -202,6 +202,14 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
taskId = context.taskId().toString();
streamsMetrics = context.metrics();
this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
updateBufferMetrics();
open = true;
partition = context.taskId().partition();
}
@Override
public void assignThread() {
bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor(
taskId,
METRIC_SCOPE,
@ -214,11 +222,6 @@ public final class InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
storeName,
streamsMetrics
);
this.context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
updateBufferMetrics();
open = true;
partition = context.taskId().partition();
}
@Override

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
@ -104,15 +103,6 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final StateStore root) {
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
if (root != null) {
final boolean consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
stateStoreContext.appConfigs(),
@ -142,6 +132,15 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
open = true;
}
@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
}
@Override
public Position getPosition() {
return position;

View File

@ -65,6 +65,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
@Override
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.assignThread();
super.openExisting(context, streamTime);
}
}
}

View File

@ -126,6 +126,11 @@ public class KeyValueStoreWrapper<K, V> implements StateStore {
store.init(stateStoreContext, root);
}
@Override
public void assignThread() {
store.assignThread();
}
@Override
public void flush() {
store.flush();

View File

@ -99,6 +99,11 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
store.init(stateStoreContext, root);
}
@Override
public void assignThread() {
store.assignThread();
}
@Override
public void flush() {
store.flush();
@ -235,4 +240,4 @@ public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueSt
return KeyValue.pair(next.key, convertToTimestampedFormat(next.value));
}
}
}
}

View File

@ -141,6 +141,11 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
throw new UnsupportedOperationException("cannot initialize a logical segment");
}
@Override
public void assignThread() {
throw new UnsupportedOperationException("nothing to reassign");
}
@Override
public void flush() {
throw new UnsupportedOperationException("nothing to flush for logical segment");
@ -368,4 +373,4 @@ class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segm
private static byte[] serializeLongToBytes(final long l) {
return ByteBuffer.allocate(Long.BYTES).putLong(l).array();
}
}
}

View File

@ -105,6 +105,7 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
@Override
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.assignThread();
physicalStore.openDB(context.appConfigs(), context.stateDir());
}
@ -142,4 +143,4 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
return super.segmentName(segmentId);
}
}
}

View File

@ -128,13 +128,15 @@ public class MeteredKeyValueStore<K, V>
initStoreSerde(stateStoreContext);
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
super.init(stateStoreContext, root);
}
@Override
public void assignThread() {
registerMetrics();
super.assignThread();
}
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
@ -150,6 +152,7 @@ public class MeteredKeyValueStore<K, V>
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.sum());
openIterators = new OpenIterators(taskId, metricsScope, name(), streamsMetrics);
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
}
@Override

View File

@ -108,12 +108,15 @@ public class MeteredSessionStore<K, V>
initStoreSerde(stateStoreContext);
streamsMetrics = (StreamsMetricsImpl) stateStoreContext.metrics();
registerMetrics();
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
super.init(stateStoreContext, root);
}
@Override
public void assignThread() {
registerMetrics();
super.assignThread();
}
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
@ -129,6 +132,8 @@ public class MeteredSessionStore<K, V>
return openIteratorsIterator.hasNext() ? openIteratorsIterator.next().startTimestamp() : null;
}
);
restoreSensor = StateStoreMetrics.restoreSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
}
@Override

View File

@ -179,6 +179,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
false);
}
@Override
public void assignThread() {
metricsRecorder.assignThread();
}
@SuppressWarnings("unchecked")
void openDB(final Map<String, Object> configs, final File stateDir) {
// initialize the default rocksdb options
@ -1016,4 +1021,4 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
return null;
}
}
}
}

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializati
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
@ -352,16 +351,6 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = stateStoreContext.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);
metricsRecorder.init(ProcessorContextUtils.metricsImpl(stateStoreContext), stateStoreContext.taskId());
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
@ -386,6 +375,16 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
);
}
@Override
public void assignThread() {
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
Thread.currentThread().getName(),
internalProcessorContext.taskId().toString(),
ProcessorContextUtils.metricsImpl(internalProcessorContext)
);
metricsRecorder.assignThread();
}
// VisibleForTesting
void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
@ -1012,4 +1011,4 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
private static String latestValueStoreName(final String storeName) {
return storeName + ".latestValues";
}
}
}

View File

@ -65,6 +65,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
@Override
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
metricsRecorder.assignThread();
super.openExisting(context, streamTime);
}
}

View File

@ -92,6 +92,11 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore
inner.init(stateStoreContext, root);
}
@Override
public void assignThread() {
inner.assignThread();
}
@Override
public void flush() {
inner.flush();
@ -180,4 +185,4 @@ public class VersionedKeyValueToBytesStoreAdapter implements VersionedBytesStore
null,
ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp()));
}
}
}

View File

@ -160,6 +160,11 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
store.init(stateStoreContext, root);
}
@Override
public void assignThread() {
store.assignThread();
}
@Override
public void flush() {
store.flush();
@ -212,4 +217,4 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
}
}
}
}

View File

@ -63,6 +63,11 @@ public abstract class WrappedStateStore<S extends StateStore, K, V> implements S
wrapped.init(stateStoreContext, root);
}
@Override
public void assignThread() {
wrapped.assignThread();
}
@SuppressWarnings("unchecked")
@Override
public boolean setFlushListener(final CacheFlushListener<K, V> listener,

View File

@ -146,11 +146,14 @@ public class RocksDBMetricsRecorder {
+ "This is a bug in Kafka Streams. " +
"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues");
}
this.taskId = taskId;
this.streamsMetrics = streamsMetrics;
}
public void assignThread() {
final RocksDBMetricContext metricContext = new RocksDBMetricContext(taskId.toString(), metricsScope, storeName);
initSensors(streamsMetrics, metricContext);
initGauges(streamsMetrics, metricContext);
this.taskId = taskId;
this.streamsMetrics = streamsMetrics;
}
public void addValueProviders(final String segmentName,
@ -510,4 +513,4 @@ public class RocksDBMetricsRecorder {
}
return (double) sum / count;
}
}
}

View File

@ -4898,6 +4898,10 @@ public class TaskManagerTest {
}
}
@Override
public void assignThread() {
}
@Override
public void addPartitionsForOffsetReset(final Set<TopicPartition> partitionsForOffsetReset) {
this.partitionsForOffsetReset = partitionsForOffsetReset;

View File

@ -1213,6 +1213,11 @@ public class TopologyTestDriver implements Closeable {
inner.init(stateStoreContext, root);
}
@Override
public void assignThread() {
inner.assignThread();
}
@Override
public void put(final K key, final V value) {
inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
@ -1277,6 +1282,11 @@ public class TopologyTestDriver implements Closeable {
inner.init(stateStoreContext, root);
}
@Override
public void assignThread() {
inner.assignThread();
}
@Override
public void put(final K key,
final V value,