KAFKA-15045: (KIP-924 pt. 8) Added TopicPartitionAssignmentInfo (#16024)

For task assignment purposes, the user needs to have a set of information available for each topic partition affecting the desired tasks.

This PR introduces a new interface for a read-only container class that allows all the important and relevant information to be found in one place.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Antoine Pourchet 2024-05-22 16:52:53 -06:00 committed by GitHub
parent 27a6c156c4
commit 06739d5aa0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 125 additions and 0 deletions

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
/**
* This is a simple container class used during the assignment process to distinguish
* TopicPartitions type. Since the assignment logic can depend on the type of topic we're
* looking at, and the rack information of the partition, this container class should have
* everything necessary to make informed task assignment decisions.
*/
public interface TaskTopicPartition {
/**
*
* @return the {@code TopicPartition} for this task.
*/
TopicPartition topicPartition();
/**
*
* @return whether the underlying topic is a source topic or not. Source changelog topics
* are both source topics and changelog topics.
*/
boolean isSource();
/**
*
* @return whether the underlying topic is a changelog topic or not. Source changelog topics
* are both source topics and changelog topics.
*/
boolean isChangelog();
/**
*
* @return the broker rack ids on which this topic partition resides. If no information could
* be found, this will return an empty optional value.
*/
Optional<Set<String>> rackIds();
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.assignment.TaskTopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a simple container class used during the assignment process to distinguish
* TopicPartitions type. Since the assignment logic can depend on the type of topic we're
* looking at, and the rack information of the partition, this container class should have
* everything necessary to make informed task assignment decisions.
*/
public class DefaultTaskTopicPartition implements TaskTopicPartition {
private static final Logger LOG = LoggerFactory.getLogger(DefaultTaskTopicPartition.class);
private final TopicPartition topicPartition;
private final boolean isSourceTopic;
private final boolean isChangelogTopic;
private final Optional<Set<String>> rackIds;
public DefaultTaskTopicPartition(final TopicPartition topicPartition,
final boolean isSourceTopic,
final boolean isChangelogTopic,
final Set<String> rackIds) {
this.topicPartition = topicPartition;
this.isSourceTopic = isSourceTopic;
this.isChangelogTopic = isChangelogTopic;
this.rackIds = Optional.ofNullable(rackIds);
}
@Override
public TopicPartition topicPartition() {
return topicPartition;
}
@Override
public boolean isSource() {
return isSourceTopic;
}
@Override
public boolean isChangelog() {
return isChangelogTopic;
}
@Override
public Optional<Set<String>> rackIds() {
return rackIds;
}
}