KAFKA-19135 Migrate initial IQ support for KIP-1071 from feature branch to trunk (#19588)

This PR is a migration of the initial IQ support for KIP-1071 from the
feature branch to trunk.  It includes a parameterized integration test
that expects the same results whether using either the classic or new
streams group protocol.

Note that this PR will deliver IQ information in each heartbeat
response.  A follow-up PR will change that to be only sending IQ
information when assignments change.

Reviewers Lucas Brutschy <lucasbru@apache.org>
This commit is contained in:
Bill Bejeck 2025-04-29 20:08:49 -04:00 committed by GitHub
parent 3bb15c5dee
commit 431cffc93f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 620 additions and 20 deletions

View File

@ -674,16 +674,24 @@ public class StreamsGroupHeartbeatRequestManager implements RequestManager {
membershipManager.transitionToFatal();
}
private static Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> convertHostInfoMap(final StreamsGroupHeartbeatResponseData data) {
Map<StreamsRebalanceData.HostInfo, List<TopicPartition>> partitionsByHost = new HashMap<>();
private static Map<StreamsRebalanceData.HostInfo, StreamsRebalanceData.EndpointPartitions> convertHostInfoMap(
final StreamsGroupHeartbeatResponseData data) {
Map<StreamsRebalanceData.HostInfo, StreamsRebalanceData.EndpointPartitions> partitionsByHost = new HashMap<>();
data.partitionsByUserEndpoint().forEach(endpoint -> {
List<TopicPartition> topicPartitions = endpoint.partitions().stream()
.flatMap(partition ->
partition.partitions().stream().map(partitionId -> new TopicPartition(partition.topic(), partitionId)))
.collect(Collectors.toList());
List<TopicPartition> activeTopicPartitions = getTopicPartitionList(endpoint.activePartitions());
List<TopicPartition> standbyTopicPartitions = getTopicPartitionList(endpoint.standbyPartitions());
StreamsGroupHeartbeatResponseData.Endpoint userEndpoint = endpoint.userEndpoint();
partitionsByHost.put(new StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()), topicPartitions);
StreamsRebalanceData.EndpointPartitions endpointPartitions = new StreamsRebalanceData.EndpointPartitions(activeTopicPartitions, standbyTopicPartitions);
partitionsByHost.put(new StreamsRebalanceData.HostInfo(userEndpoint.host(), userEndpoint.port()), endpointPartitions);
});
return partitionsByHost;
}
static List<TopicPartition> getTopicPartitionList(List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions) {
return topicPartitions.stream()
.flatMap(partition ->
partition.partitions().stream().map(partitionId -> new TopicPartition(partition.topic(), partitionId)))
.collect(Collectors.toList());
}
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@ -119,6 +120,31 @@ public class StreamsRebalanceData {
}
}
public static class EndpointPartitions {
private final List<TopicPartition> activePartitions;
private final List<TopicPartition> standbyPartitions;
public EndpointPartitions(final List<TopicPartition> activePartitions,
final List<TopicPartition> standbyPartitions) {
this.activePartitions = activePartitions;
this.standbyPartitions = standbyPartitions;
}
public List<TopicPartition> activePartitions() {
return new ArrayList<>(activePartitions);
}
public List<TopicPartition> standbyPartitions() {
return new ArrayList<>(standbyPartitions);
}
@Override
public String toString() {
return "EndpointPartitions {"
+ "activePartitions=" + activePartitions
+ ", standbyPartitions=" + standbyPartitions
+ '}';
}
}
public static class Assignment {
@ -297,7 +323,7 @@ public class StreamsRebalanceData {
private final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(Assignment.EMPTY);
private final AtomicReference<Map<HostInfo, List<TopicPartition>>> partitionsByHost = new AtomicReference<>(Collections.emptyMap());
private final AtomicReference<Map<HostInfo, EndpointPartitions>> partitionsByHost = new AtomicReference<>(Collections.emptyMap());
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
@ -341,11 +367,11 @@ public class StreamsRebalanceData {
return reconciledAssignment.get();
}
public void setPartitionsByHost(final Map<HostInfo, List<TopicPartition>> partitionsByHost) {
public void setPartitionsByHost(final Map<HostInfo, EndpointPartitions> partitionsByHost) {
this.partitionsByHost.set(partitionsByHost);
}
public Map<HostInfo, List<TopicPartition>> partitionsByHost() {
public Map<HostInfo, EndpointPartitions> partitionsByHost() {
return partitionsByHost.get();
}

View File

@ -70,8 +70,10 @@
"fields": [
{ "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
"about": "User-defined endpoint to connect to the node" },
{ "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
"about": "All partitions available on the node" }
{ "name": "ActivePartitions", "type": "[]TopicPartition", "versions": "0+",
"about": "All topic partitions materialized by active tasks on the node" },
{ "name": "StandbyPartitions", "type": "[]TopicPartition", "versions": "0+",
"about": "All topic partitions materialized by standby tasks on the node" }
]
}
],

View File

@ -153,7 +153,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
List.of(
new StreamsGroupHeartbeatResponseData.EndpointToPartitions()
.setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(8080))
.setPartitions(List.of(
.setActivePartitions(List.of(
new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("topic").setPartitions(List.of(0)))
)
);
@ -591,9 +591,9 @@ class StreamsGroupHeartbeatRequestManagerTest {
.get(new StreamsRebalanceData.HostInfo(
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
);
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(), topicPartitions.get(0).topic());
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0), topicPartitions.get(0).partition());
).activePartitions();
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0).topic(), topicPartitions.get(0).topic());
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0).partitions().get(0), topicPartitions.get(0).partition());
assertEquals(
1.0,
metrics.metric(metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue()

View File

@ -158,6 +158,7 @@ import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
import org.apache.kafka.image.MetadataDelta;
@ -1982,7 +1983,8 @@ public class GroupMetadataManager {
StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
// The assignment is only provided in the following cases:
// 1. The member is joining.
@ -2093,6 +2095,25 @@ public class GroupMetadataManager {
.collect(Collectors.toList());
}
private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group) {
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>();
final Map<String, StreamsGroupMember> members = group.members();
for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) {
final String memberIdForAssignment = entry.getKey();
final Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional = members.get(memberIdForAssignment).userEndpoint();
StreamsGroupMember groupMember = entry.getValue();
if (endpointOptional.isPresent()) {
final StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get();
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
responseEndpoint.setHost(endpoint.host());
responseEndpoint.setPort(endpoint.port());
StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, group);
endpointToPartitionsList.add(endpointToPartitions);
}
}
return endpointToPartitionsList.isEmpty() ? null : endpointToPartitionsList;
}
/**
* Handles a regular heartbeat from a consumer group member. It mainly consists of
* three parts:

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TopicMetadata;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class EndpointToPartitionsManager {
private EndpointToPartitionsManager() {
}
public static StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions(final StreamsGroupMember streamsGroupMember,
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint,
final StreamsGroup streamsGroup) {
StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = new StreamsGroupHeartbeatResponseData.EndpointToPartitions();
Map<String, Set<Integer>> activeTasks = streamsGroupMember.assignedTasks().activeTasks();
Map<String, Set<Integer>> standbyTasks = streamsGroupMember.assignedTasks().standbyTasks();
endpointToPartitions.setUserEndpoint(responseEndpoint);
Map<String, ConfiguredSubtopology> configuredSubtopologies = streamsGroup.configuredTopology().flatMap(ConfiguredTopology::subtopologies).get();
List<StreamsGroupHeartbeatResponseData.TopicPartition> activeTopicPartitions = topicPartitions(activeTasks, configuredSubtopologies, streamsGroup.partitionMetadata());
List<StreamsGroupHeartbeatResponseData.TopicPartition> standbyTopicPartitions = topicPartitions(standbyTasks, configuredSubtopologies, streamsGroup.partitionMetadata());
endpointToPartitions.setActivePartitions(activeTopicPartitions);
endpointToPartitions.setStandbyPartitions(standbyTopicPartitions);
return endpointToPartitions;
}
private static List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions(final Map<String, Set<Integer>> tasks,
final Map<String, ConfiguredSubtopology> configuredSubtopologies,
final Map<String, TopicMetadata> groupTopicMetadata) {
List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionsForTasks = new ArrayList<>();
for (Map.Entry<String, Set<Integer>> taskEntry : tasks.entrySet()) {
String subtopologyId = taskEntry.getKey();
ConfiguredSubtopology configuredSubtopology = configuredSubtopologies.get(subtopologyId);
Set<String> sourceTopics = configuredSubtopology.sourceTopics();
Set<String> repartitionSourceTopics = configuredSubtopology.repartitionSourceTopics().keySet();
Set<String> allSourceTopic = new HashSet<>(sourceTopics);
allSourceTopic.addAll(repartitionSourceTopics);
List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionList = topicPartitionListForTask(taskEntry.getValue(), allSourceTopic, groupTopicMetadata);
topicPartitionsForTasks.addAll(topicPartitionList);
}
return topicPartitionsForTasks;
}
private static List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitionListForTask(final Set<Integer> taskSet,
final Set<String> topicNames,
final Map<String, TopicMetadata> groupTopicMetadata) {
return topicNames.stream().map(topic -> {
int numPartitionsForTopic = groupTopicMetadata.get(topic).numPartitions();
StreamsGroupHeartbeatResponseData.TopicPartition tp = new StreamsGroupHeartbeatResponseData.TopicPartition();
tp.setTopic(topic);
List<Integer> tpPartitions = new ArrayList<>(taskSet);
if (numPartitionsForTopic < taskSet.size()) {
Collections.sort(tpPartitions);
tp.setPartitions(tpPartitions.subList(0, numPartitionsForTopic));
} else {
tp.setPartitions(tpPartitions);
}
return tp;
}).toList();
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams.topics;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.TopicMetadata;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
class EndpointToPartitionsManagerTest {
private StreamsGroup streamsGroup;
private StreamsGroupMember streamsGroupMember;
private ConfiguredTopology configuredTopology;
private ConfiguredSubtopology configuredSubtopologyOne;
private ConfiguredSubtopology configuredSubtopologyTwo;
private final Map<String, Set<Integer>> activeTasks = new HashMap<>();
private final Map<String, Set<Integer>> standbyTasks = new HashMap<>();
private TasksTuple tasksTuple;
private final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
@BeforeEach
public void setUp() {
streamsGroup = mock(StreamsGroup.class);
streamsGroupMember = mock(StreamsGroupMember.class);
configuredTopology = mock(ConfiguredTopology.class);
configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A"), new HashMap<>(), new HashSet<>(), new HashMap<>());
Map<String, ConfiguredInternalTopic> repartitionSourceTopics = Map.of("Topic-B", new ConfiguredInternalTopic("Topic-B", 1, Optional.of((short) 1), Collections.emptyMap()));
configuredSubtopologyTwo = new ConfiguredSubtopology(new HashSet<>(), repartitionSourceTopics, new HashSet<>(), new HashMap<>());
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = new TreeMap<>();
configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyTwoMap = new TreeMap<>();
configuredSubtopologyOneMap.put("1", configuredSubtopologyTwo);
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyTwoMap));
responseEndpoint.setHost("localhost");
responseEndpoint.setPort(9092);
}
@Test
void testEndpointToPartitionsWithStandbyTaskAssignments() {
Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), "Topic-A", 3));
topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), "Topic-B", 3));
activeTasks.put("0", Set.of(0, 1, 2));
standbyTasks.put("1", Set.of(0, 1, 2));
tasksTuple = new TasksTuple(activeTasks, standbyTasks, Collections.emptyMap());
when(streamsGroupMember.assignedTasks()).thenReturn(tasksTuple);
//when(streamsGroupMember.assignedTasks().standbyTasks()).thenReturn(tasksTuple.standbyTasks());
when((streamsGroup.partitionMetadata())).thenReturn(topicMetadata);
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyMap = new TreeMap<>();
configuredSubtopologyMap.put("0", configuredSubtopologyOne);
configuredSubtopologyMap.put("1", configuredSubtopologyTwo);
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyMap));
StreamsGroupHeartbeatResponseData.EndpointToPartitions result =
EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup);
assertEquals(responseEndpoint, result.userEndpoint());
assertEquals(1, result.activePartitions().size());
assertEquals(1, result.standbyPartitions().size());
List<StreamsGroupHeartbeatResponseData.TopicPartition> activePartitions = result.activePartitions();
List<StreamsGroupHeartbeatResponseData.TopicPartition> standbyPartitions = result.standbyPartitions();
activePartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
standbyPartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
assertTopicPartitionsAssigned(activePartitions, "Topic-A");
assertTopicPartitionsAssigned(standbyPartitions, "Topic-B");
}
private static void assertTopicPartitionsAssigned(List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions, String topicName) {
StreamsGroupHeartbeatResponseData.TopicPartition topicPartition = topicPartitions.stream().filter(tp -> tp.topic().equals(topicName)).findFirst().get();
assertEquals(topicName, topicPartition.topic());
assertEquals(List.of(0, 1, 2), topicPartition.partitions().stream().sorted().toList());
}
@ParameterizedTest(name = "{4}")
@MethodSource("argsProvider")
void testEndpointToPartitionsWithTwoTopicsAndDifferentPartitions(int topicAPartitions,
int topicBPartitions,
List<Integer> topicAExpectedPartitions,
List<Integer> topicBExpectedPartitions,
String testName
) {
Map<String, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put("Topic-A", new TopicMetadata(Uuid.randomUuid(), "Topic-A", topicAPartitions));
topicMetadata.put("Topic-B", new TopicMetadata(Uuid.randomUuid(), "Topic-B", topicBPartitions));
configuredSubtopologyOne = new ConfiguredSubtopology(Set.of("Topic-A", "Topic-B"), new HashMap<>(), new HashSet<>(), new HashMap<>());
activeTasks.put("0", Set.of(0, 1, 2, 3, 4));
when(streamsGroupMember.assignedTasks()).thenReturn(new TasksTuple(activeTasks, Collections.emptyMap(), Collections.emptyMap()));
when(streamsGroup.partitionMetadata()).thenReturn(topicMetadata);
when(streamsGroup.configuredTopology()).thenReturn(Optional.of(configuredTopology));
SortedMap<String, ConfiguredSubtopology> configuredSubtopologyOneMap = new TreeMap<>();
configuredSubtopologyOneMap.put("0", configuredSubtopologyOne);
when(configuredTopology.subtopologies()).thenReturn(Optional.of(configuredSubtopologyOneMap));
StreamsGroupHeartbeatResponseData.EndpointToPartitions result = EndpointToPartitionsManager.endpointToPartitions(streamsGroupMember, responseEndpoint, streamsGroup);
assertEquals(responseEndpoint, result.userEndpoint());
assertEquals(2, result.activePartitions().size());
List<StreamsGroupHeartbeatResponseData.TopicPartition> topicPartitions = result.activePartitions();
topicPartitions.sort(Comparator.comparing(StreamsGroupHeartbeatResponseData.TopicPartition::topic));
StreamsGroupHeartbeatResponseData.TopicPartition topicAPartition = result.activePartitions().get(0);
assertEquals("Topic-A", topicAPartition.topic());
assertEquals(topicAExpectedPartitions, topicAPartition.partitions().stream().sorted().toList());
StreamsGroupHeartbeatResponseData.TopicPartition topicBPartition = result.activePartitions().get(1);
assertEquals("Topic-B", topicBPartition.topic());
assertEquals(topicBExpectedPartitions, topicBPartition.partitions().stream().sorted().toList());
}
static Stream<Arguments> argsProvider() {
return Stream.of(
arguments(2, 5, List.of(0, 1), List.of(0, 1, 2, 3, 4), "Should assign correct partitions when partitions differ between topics"),
arguments(3, 3, List.of(0, 1, 2), List.of(0, 1, 2), "Should assign correct partitions when partitions same between topics")
);
}
}

View File

@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Stream;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@Timeout(600)
@Tag("integration")
public class IQv2EndpointToPartitionsIntegrationTest {
private String appId;
private String inputTopicTwoPartitions;
private String outputTopicTwoPartitions;
private Properties streamsApplicationProperties = new Properties();
private Properties streamsSecondApplicationProperties = new Properties();
private static EmbeddedKafkaCluster cluster;
private static final int NUM_BROKERS = 3;
private static final String EXPECTED_STORE_NAME = "IQTest-count";
public void startCluster(final int standbyConfig) throws IOException {
final Properties properties = new Properties();
properties.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, standbyConfig);
cluster = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS, properties);
cluster.start();
}
public void setUp() throws InterruptedException {
appId = safeUniqueTestName("endpointIntegrationTest");
inputTopicTwoPartitions = appId + "-input-two";
outputTopicTwoPartitions = appId + "-output-two";
cluster.createTopic(inputTopicTwoPartitions, 2, 1);
cluster.createTopic(outputTopicTwoPartitions, 2, 1);
}
public void closeCluster() {
cluster.stop();
}
@AfterEach
public void tearDown() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsApplicationProperties);
if (!streamsSecondApplicationProperties.isEmpty()) {
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
}
}
@ParameterizedTest(name = "{3}")
@MethodSource("groupProtocolParameters")
public void shouldGetCorrectHostPartitionInformation(final String groupProtocolConfig,
final boolean usingStandbyReplicas,
final int numStandbyReplicas,
final String testName) throws Exception {
try {
startCluster(usingStandbyReplicas ? numStandbyReplicas : 0);
setUp();
final Properties streamOneProperties = new Properties();
streamOneProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1");
streamOneProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks1");
streamOneProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:2020");
streamOneProperties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocolConfig);
if (usingStandbyReplicas) {
streamOneProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
}
streamsApplicationProperties = props(streamOneProperties);
final Properties streamTwoProperties = new Properties();
streamTwoProperties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2");
streamTwoProperties.put(StreamsConfig.CLIENT_ID_CONFIG, appId + "-ks2");
streamTwoProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:3030");
streamTwoProperties.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocolConfig);
if (usingStandbyReplicas) {
streamTwoProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
}
streamsSecondApplicationProperties = props(streamTwoProperties);
final Topology topology = complexTopology();
try (final KafkaStreams streamsOne = new KafkaStreams(topology, streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
waitForCondition(() -> !streamsOne.metadataForAllStreamsClients().isEmpty(),
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams didn't get metadata about the client.");
waitForCondition(() -> streamsOne.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 4,
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams one didn't get 4 tasks");
final List<StreamsMetadata> streamsMetadataAllClients = new ArrayList<>(streamsOne.metadataForAllStreamsClients());
assertEquals(1, streamsMetadataAllClients.size());
final StreamsMetadata streamsOneInitialMetadata = streamsMetadataAllClients.get(0);
assertEquals(2020, streamsOneInitialMetadata.hostInfo().port());
final Set<TopicPartition> topicPartitions = streamsOneInitialMetadata.topicPartitions();
assertEquals(4, topicPartitions.size());
assertEquals(0, streamsOneInitialMetadata.standbyTopicPartitions().size());
final long repartitionTopicTaskCount = topicPartitions.stream().filter(tp -> tp.topic().contains("-repartition")).count();
final long sourceTopicTaskCount = topicPartitions.stream().filter(tp -> tp.topic().contains("-input-two")).count();
assertEquals(2, repartitionTopicTaskCount);
assertEquals(2, sourceTopicTaskCount);
final int expectedStandbyCount = usingStandbyReplicas ? 1 : 0;
try (final KafkaStreams streamsTwo = new KafkaStreams(topology, streamsSecondApplicationProperties)) {
streamsTwo.start();
waitForCondition(() -> KafkaStreams.State.RUNNING == streamsTwo.state() && KafkaStreams.State.RUNNING == streamsOne.state(),
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams one or two never transitioned to a RUNNING state.");
waitForCondition(() -> {
final ThreadMetadata threadMetadata = streamsOne.metadataForLocalThreads().iterator().next();
return threadMetadata.activeTasks().size() == 2 && threadMetadata.standbyTasks().size() == expectedStandbyCount;
}, TestUtils.DEFAULT_MAX_WAIT_MS,
"KafkaStreams one never released active tasks and received standby task");
waitForCondition(() -> {
final ThreadMetadata threadMetadata = streamsTwo.metadataForLocalThreads().iterator().next();
return threadMetadata.activeTasks().size() == 2 && threadMetadata.standbyTasks().size() == expectedStandbyCount;
}, TestUtils.DEFAULT_MAX_WAIT_MS,
"KafkaStreams two never received active tasks and standby");
waitForCondition(() -> {
final List<StreamsMetadata> metadata = new ArrayList<>(streamsTwo.metadataForAllStreamsClients());
return metadata.size() == 2 &&
metadata.get(0).standbyTopicPartitions().size() == expectedStandbyCount &&
metadata.get(1).standbyTopicPartitions().size() == expectedStandbyCount;
}, TestUtils.DEFAULT_MAX_WAIT_MS,
"Kafka Streams clients 1 and 2 never got metadata about standby tasks");
waitForCondition(() -> streamsOne.metadataForAllStreamsClients().iterator().next().topicPartitions().size() == 2,
IntegrationTestUtils.DEFAULT_TIMEOUT,
() -> "Kafka Streams one didn't give up active tasks");
final List<StreamsMetadata> allClientMetadataUpdated = new ArrayList<>(streamsTwo.metadataForAllStreamsClients());
final StreamsMetadata streamsOneMetadata = allClientMetadataUpdated.get(0);
final Set<TopicPartition> streamsOneActiveTopicPartitions = streamsOneMetadata.topicPartitions();
final Set<TopicPartition> streamsOneStandbyTopicPartitions = streamsOneMetadata.standbyTopicPartitions();
final Set<String> streamsOneStoreNames = streamsOneMetadata.stateStoreNames();
final Set<String> streamsOneStandbyStoreNames = streamsOneMetadata.standbyStateStoreNames();
assertEquals(2020, streamsOneMetadata.hostInfo().port());
assertEquals(2, streamsOneActiveTopicPartitions.size());
assertEquals(expectedStandbyCount, streamsOneStandbyTopicPartitions.size());
assertEquals(1, streamsOneStoreNames.size());
assertEquals(expectedStandbyCount, streamsOneStandbyStoreNames.size());
assertEquals(EXPECTED_STORE_NAME, streamsOneStoreNames.iterator().next());
if (usingStandbyReplicas) {
assertEquals(EXPECTED_STORE_NAME, streamsOneStandbyStoreNames.iterator().next());
}
final long streamsOneRepartitionTopicCount = streamsOneActiveTopicPartitions.stream().filter(tp -> tp.topic().contains("-repartition")).count();
final long streamsOneSourceTopicCount = streamsOneActiveTopicPartitions.stream().filter(tp -> tp.topic().contains("-input-two")).count();
assertEquals(1, streamsOneRepartitionTopicCount);
assertEquals(1, streamsOneSourceTopicCount);
final StreamsMetadata streamsTwoMetadata = allClientMetadataUpdated.get(1);
final Set<TopicPartition> streamsTwoActiveTopicPartitions = streamsTwoMetadata.topicPartitions();
final Set<TopicPartition> streamsTwoStandbyTopicPartitions = streamsTwoMetadata.standbyTopicPartitions();
final Set<String> streamsTwoStateStoreNames = streamsTwoMetadata.stateStoreNames();
final Set<String> streamsTwoStandbyStateStoreNames = streamsTwoMetadata.standbyStateStoreNames();
assertEquals(3030, streamsTwoMetadata.hostInfo().port());
assertEquals(2, streamsTwoActiveTopicPartitions.size());
assertEquals(expectedStandbyCount, streamsTwoStandbyTopicPartitions.size());
assertEquals(1, streamsTwoStateStoreNames.size());
assertEquals(expectedStandbyCount, streamsTwoStandbyStateStoreNames.size());
assertEquals(EXPECTED_STORE_NAME, streamsTwoStateStoreNames.iterator().next());
if (usingStandbyReplicas) {
assertEquals(EXPECTED_STORE_NAME, streamsTwoStandbyStateStoreNames.iterator().next());
}
final long streamsTwoRepartitionTopicCount = streamsTwoActiveTopicPartitions.stream().filter(tp -> tp.topic().contains("-repartition")).count();
final long streamsTwoSourceTopicCount = streamsTwoActiveTopicPartitions.stream().filter(tp -> tp.topic().contains("-input-two")).count();
assertEquals(1, streamsTwoRepartitionTopicCount);
assertEquals(1, streamsTwoSourceTopicCount);
if (usingStandbyReplicas) {
final TopicPartition streamsOneStandbyTopicPartition = streamsOneStandbyTopicPartitions.iterator().next();
final TopicPartition streamsTwoStandbyTopicPartition = streamsTwoStandbyTopicPartitions.iterator().next();
final String streamsOneStandbyTopicName = streamsOneStandbyTopicPartition.topic();
final String streamsTwoStandbyTopicName = streamsTwoStandbyTopicPartition.topic();
assertEquals(streamsOneStandbyTopicName, streamsTwoStandbyTopicName);
assertNotEquals(streamsOneStandbyTopicPartition.partition(), streamsTwoStandbyTopicPartition.partition());
}
}
}
} finally {
closeCluster();
}
}
private static Stream<Arguments> groupProtocolParameters() {
return Stream.of(Arguments.of("streams", false, 0, "STREAMS protocol No standby"),
Arguments.of("classic", false, 0, "CLASSIC protocol No standby"),
Arguments.of("streams", true, 1, "STREAMS protocol With standby"),
Arguments.of("classic", true, 1, "CLASSIC protocol With standby"));
}
private Properties props(final Properties extraProperties) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.putAll(extraProperties);
return streamsConfiguration;
}
private Topology complexTopology() {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopicTwoPartitions, Consumed.with(Serdes.String(), Serdes.String()))
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value, Grouped.as("IQTest"))
.count(Materialized.as(EXPECTED_STORE_NAME))
.toStream().to(outputTopicTwoPartitions, Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
}

View File

@ -1494,6 +1494,21 @@ public class StreamThread extends Thread implements ProcessingThread {
shutdownErrorHook.run();
}
}
final Map<StreamsRebalanceData.HostInfo, StreamsRebalanceData.EndpointPartitions> partitionsByEndpoint =
streamsRebalanceData.get().partitionsByHost();
final Map<HostInfo, Set<TopicPartition>> activeHostInfoMap = new HashMap<>();
final Map<HostInfo, Set<TopicPartition>> standbyHostInfoMap = new HashMap<>();
partitionsByEndpoint.forEach((hostInfo, endpointPartitions) -> {
activeHostInfoMap.put(new HostInfo(hostInfo.host(), hostInfo.port()), new HashSet<>(endpointPartitions.activePartitions()));
standbyHostInfoMap.put(new HostInfo(hostInfo.host(), hostInfo.port()), new HashSet<>(endpointPartitions.standbyPartitions()));
});
streamsMetadataState.onChange(
activeHostInfoMap,
standbyHostInfoMap,
getTopicPartitionInfo(activeHostInfoMap)
);
}
}

View File

@ -3805,6 +3805,11 @@ public class StreamThreadTest {
final Runnable shutdownErrorHook = mock(Runnable.class);
final Properties props = configProps(false, false, false);
final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(
new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)),
StreamsMetadataState.UNKNOWN_HOST,
new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
);
final StreamsConfig config = new StreamsConfig(props);
thread = new StreamThread(
new MockTime(1),
@ -3828,7 +3833,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
null
streamsMetadataState
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);
@ -3860,6 +3865,11 @@ public class StreamThreadTest {
final Properties props = configProps(false, false, false);
final Runnable shutdownErrorHook = mock(Runnable.class);
final StreamsConfig config = new StreamsConfig(props);
final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(
new TopologyMetadata(internalTopologyBuilder, config),
StreamsMetadataState.UNKNOWN_HOST,
new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
);
thread = new StreamThread(
new MockTime(1),
config,
@ -3882,7 +3892,7 @@ public class StreamThreadTest {
HANDLER,
null,
Optional.of(streamsRebalanceData),
null
streamsMetadataState
).updateThreadMetadata(adminClientId(CLIENT_ID));
thread.setState(State.STARTING);

View File

@ -90,7 +90,7 @@ public class TestUtils {
return safeUniqueTestName(methodName);
}
private static String safeUniqueTestName(final String testName) {
public static String safeUniqueTestName(final String testName) {
return sanitize(testName + Uuid.randomUuid().toString());
}