KAFKA-12779: KIP-740, Clean up public API in TaskId and fix TaskMetadata#taskId() (#10735)

As described in KIP-740, we clean up the public TaskId class and introduce new APIs to return it from TaskMetadata

Reviewers: Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
A. Sophie Blee-Goldman 2021-05-20 15:01:23 -07:00 committed by GitHub
parent 26b5352260
commit b56d9e4416
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 239 additions and 174 deletions

View File

@ -16,8 +16,6 @@
*/ */
package org.apache.kafka.streams.processor; package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -26,21 +24,25 @@ import java.util.Objects;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.MIN_NAMED_TOPOLOGY_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.readTaskIdFrom;
import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.writeTaskIdTo;
/** /**
* The task ID representation composed as topic group ID plus the assigned partition ID. * The task ID representation composed as subtopology (aka topicGroupId) plus the assigned partition ID.
*/ */
public class TaskId implements Comparable<TaskId> { public class TaskId implements Comparable<TaskId> {
private static final Logger LOG = LoggerFactory.getLogger(TaskId.class); private static final Logger LOG = LoggerFactory.getLogger(TaskId.class);
/** The ID of the topic group. */ /** The ID of the subtopology, aka topicGroupId. */
@Deprecated
public final int topicGroupId; public final int topicGroupId;
/** The ID of the partition. */ /** The ID of the partition. */
@Deprecated
public final int partition; public final int partition;
/** The namedTopology that this task belongs to, or null if it does not belong to one */ /** The namedTopology that this task belongs to, or null if it does not belong to one */
protected final String namedTopology; private final String namedTopology;
public TaskId(final int topicGroupId, final int partition) { public TaskId(final int topicGroupId, final int partition) {
this(topicGroupId, partition, null); this(topicGroupId, partition, null);
@ -58,112 +60,58 @@ public class TaskId implements Comparable<TaskId> {
} }
} }
public int subtopology() {
return topicGroupId;
}
public int partition() {
return partition;
}
/**
* Experimental feature -- will return null
*/
public String namedTopology() {
return namedTopology;
}
@Override @Override
public String toString() { public String toString() {
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
} }
public String toTaskDirString() {
return topicGroupId + "_" + partition;
}
/**
* Parse the task directory name (of the form topicGroupId_partition) and construct the TaskId with the
* optional namedTopology (may be null)
*
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
*/
public static TaskId parseTaskDirectoryName(final String taskIdStr, final String namedTopology) {
final int index = taskIdStr.indexOf('_');
if (index <= 0 || index + 1 >= taskIdStr.length()) {
throw new TaskIdFormatException(taskIdStr);
}
try {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index));
final int partition = Integer.parseInt(taskIdStr.substring(index + 1));
return new TaskId(topicGroupId, partition, namedTopology);
} catch (final Exception e) {
throw new TaskIdFormatException(taskIdStr);
}
}
/** /**
* @throws IOException if cannot write to output stream * @throws IOException if cannot write to output stream
* @deprecated since 3.0, for internal use, will be removed
*/ */
@Deprecated
public void writeTo(final DataOutputStream out, final int version) throws IOException { public void writeTo(final DataOutputStream out, final int version) throws IOException {
out.writeInt(topicGroupId); writeTaskIdTo(this, out, version);
out.writeInt(partition);
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
if (namedTopology != null) {
out.writeInt(namedTopology.length());
out.writeChars(namedTopology);
} else {
out.writeInt(0);
}
}
} }
/** /**
* @throws IOException if cannot read from input stream * @throws IOException if cannot read from input stream
* @deprecated since 3.0, for internal use, will be removed
*/ */
@Deprecated
public static TaskId readFrom(final DataInputStream in, final int version) throws IOException { public static TaskId readFrom(final DataInputStream in, final int version) throws IOException {
final int topicGroupId = in.readInt(); return readTaskIdFrom(in, version);
final int partition = in.readInt();
final String namedTopology;
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
final int numNamedTopologyChars = in.readInt();
final StringBuilder namedTopologyBuilder = new StringBuilder();
for (int i = 0; i < numNamedTopologyChars; ++i) {
namedTopologyBuilder.append(in.readChar());
}
namedTopology = namedTopologyBuilder.toString();
} else {
namedTopology = null;
}
return new TaskId(topicGroupId, partition, getNamedTopologyOrElseNull(namedTopology));
}
public void writeTo(final ByteBuffer buf, final int version) {
buf.putInt(topicGroupId);
buf.putInt(partition);
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
if (namedTopology != null) {
buf.putInt(namedTopology.length());
for (final char c : namedTopology.toCharArray()) {
buf.putChar(c);
}
} else {
buf.putInt(0);
}
}
}
public static TaskId readFrom(final ByteBuffer buf, final int version) {
final int topicGroupId = buf.getInt();
final int partition = buf.getInt();
final String namedTopology;
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
final int numNamedTopologyChars = buf.getInt();
final StringBuilder namedTopologyBuilder = new StringBuilder();
for (int i = 0; i < numNamedTopologyChars; ++i) {
namedTopologyBuilder.append(buf.getChar());
}
namedTopology = namedTopologyBuilder.toString();
} else {
namedTopology = null;
}
return new TaskId(topicGroupId, partition, getNamedTopologyOrElseNull(namedTopology));
} }
/** /**
* @return the namedTopology name, or null if the passed in namedTopology is null or the empty string * @deprecated since 3.0, for internal use, will be removed
*/ */
private static String getNamedTopologyOrElseNull(final String namedTopology) { @Deprecated
return (namedTopology == null || namedTopology.length() == 0) ? public void writeTo(final ByteBuffer buf, final int version) {
null : writeTaskIdTo(this, buf, version);
namedTopology; }
/**
* @deprecated since 3.0, for internal use, will be removed
*/
@Deprecated
public static TaskId readFrom(final ByteBuffer buf, final int version) {
return readTaskIdFrom(buf, version);
} }
@Override @Override

View File

@ -30,7 +30,7 @@ import java.util.Set;
*/ */
public class TaskMetadata { public class TaskMetadata {
private final String taskId; private final TaskId taskId;
private final Set<TopicPartition> topicPartitions; private final Set<TopicPartition> topicPartitions;
@ -40,7 +40,7 @@ public class TaskMetadata {
private final Optional<Long> timeCurrentIdlingStarted; private final Optional<Long> timeCurrentIdlingStarted;
public TaskMetadata(final String taskId, public TaskMetadata(final TaskId taskId,
final Set<TopicPartition> topicPartitions, final Set<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> committedOffsets, final Map<TopicPartition, Long> committedOffsets,
final Map<TopicPartition, Long> endOffsets, final Map<TopicPartition, Long> endOffsets,
@ -52,10 +52,22 @@ public class TaskMetadata {
this.timeCurrentIdlingStarted = timeCurrentIdlingStarted; this.timeCurrentIdlingStarted = timeCurrentIdlingStarted;
} }
public String taskId() { /**
* @return the basic task metadata such as subtopology and partition id
*/
public TaskId getTaskId() {
return taskId; return taskId;
} }
/**
* @return the basic task metadata such as subtopology and partition id
* @deprecated please use {@link #getTaskId()} instead.
*/
@Deprecated
public String taskId() {
return taskId.toString();
}
public Set<TopicPartition> topicPartitions() { public Set<TopicPartition> topicPartitions() {
return topicPartitions; return topicPartitions;
} }

View File

@ -143,7 +143,7 @@ class ActiveTaskCreator {
final LogContext logContext = getLogContext(taskId); final LogContext logContext = getLogContext(taskId);
final ProcessorTopology topology = builder.buildSubtopology(taskId.topicGroupId); final ProcessorTopology topology = builder.buildSubtopology(taskId.subtopology());
final ProcessorStateManager stateManager = new ProcessorStateManager( final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId, taskId,
@ -194,7 +194,7 @@ class ActiveTaskCreator {
inputPartitions, inputPartitions,
consumer, consumer,
logContext, logContext,
builder.buildSubtopology(standbyTask.id.topicGroupId), builder.buildSubtopology(standbyTask.id.subtopology()),
stateManager, stateManager,
context context
); );

View File

@ -75,7 +75,7 @@ public class ChangelogTopics {
final Set<TopicPartition> changelogTopicPartitions = topicsInfo.stateChangelogTopics final Set<TopicPartition> changelogTopicPartitions = topicsInfo.stateChangelogTopics
.keySet() .keySet()
.stream() .stream()
.map(topic -> new TopicPartition(topic, task.partition)) .map(topic -> new TopicPartition(topic, task.partition()))
.collect(Collectors.toSet()); .collect(Collectors.toSet());
changelogPartitionsForStatefulTask.put(task, changelogTopicPartitions); changelogPartitionsForStatefulTask.put(task, changelogTopicPartitions);
} }
@ -84,8 +84,8 @@ public class ChangelogTopics {
// the expected number of partitions is the max value of TaskId.partition + 1 // the expected number of partitions is the max value of TaskId.partition + 1
int numPartitions = UNKNOWN; int numPartitions = UNKNOWN;
for (final TaskId task : topicGroupTasks) { for (final TaskId task : topicGroupTasks) {
if (numPartitions < task.partition + 1) { if (numPartitions < task.partition() + 1) {
numPartitions = task.partition + 1; numPartitions = task.partition() + 1;
} }
} }
topicConfig.setNumberOfPartitions(numPartitions); topicConfig.setNumberOfPartitions(numPartitions);

View File

@ -615,7 +615,7 @@ public class ProcessorStateManager implements StateManager {
// NOTE we assume the partition of the topic can always be inferred from the task id; // NOTE we assume the partition of the topic can always be inferred from the task id;
// if user ever use a custom partition grouper (deprecated in KIP-528) this would break and // if user ever use a custom partition grouper (deprecated in KIP-528) this would break and
// it is not a regression (it would always break anyways) // it is not a regression (it would always break anyways)
return new TopicPartition(changelogFor(storeName), taskId.partition); return new TopicPartition(changelogFor(storeName), taskId.partition());
} }
private boolean isLoggingEnabled(final String storeName) { private boolean isLoggingEnabled(final String storeName) {

View File

@ -74,7 +74,7 @@ class StandbyTaskCreator {
final TaskId taskId = newTaskAndPartitions.getKey(); final TaskId taskId = newTaskAndPartitions.getKey();
final Set<TopicPartition> partitions = newTaskAndPartitions.getValue(); final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
final ProcessorTopology topology = builder.buildSubtopology(taskId.topicGroupId); final ProcessorTopology topology = builder.buildSubtopology(taskId.subtopology());
if (topology.hasStateWithChangelogs()) { if (topology.hasStateWithChangelogs()) {
final ProcessorStateManager stateManager = new ProcessorStateManager( final ProcessorStateManager stateManager = new ProcessorStateManager(
@ -120,7 +120,7 @@ class StandbyTaskCreator {
return createStandbyTask( return createStandbyTask(
streamTask.id(), streamTask.id(),
inputPartitions, inputPartitions,
builder.buildSubtopology(streamTask.id.topicGroupId), builder.buildSubtopology(streamTask.id.subtopology()),
stateManager, stateManager,
context context
); );

View File

@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
/** /**
* Manages the directories where the state of Tasks owned by a {@link StreamThread} are * Manages the directories where the state of Tasks owned by a {@link StreamThread} are
@ -388,7 +389,7 @@ public class StateDirectory {
private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) {
for (final File taskDir : listNonEmptyTaskDirectories()) { for (final File taskDir : listNonEmptyTaskDirectories()) {
final String dirName = taskDir.getName(); final String dirName = taskDir.getName();
final TaskId id = TaskId.parseTaskDirectoryName(dirName, null); final TaskId id = parseTaskDirectoryName(dirName, null);
if (!lockedTasksToOwner.containsKey(id)) { if (!lockedTasksToOwner.containsKey(id)) {
try { try {
if (lock(id)) { if (lock(id)) {
@ -421,7 +422,7 @@ public class StateDirectory {
final AtomicReference<Exception> firstException = new AtomicReference<>(); final AtomicReference<Exception> firstException = new AtomicReference<>();
for (final File taskDir : listAllTaskDirectories()) { for (final File taskDir : listAllTaskDirectories()) {
final String dirName = taskDir.getName(); final String dirName = taskDir.getName();
final TaskId id = TaskId.parseTaskDirectoryName(dirName, null); final TaskId id = parseTaskDirectoryName(dirName, null);
try { try {
log.info("{} Deleting state directory {} for task {} as user calling cleanup.", log.info("{} Deleting state directory {} for task {} as user calling cleanup.",
logPrefix(), dirName, id); logPrefix(), dirName, id);

View File

@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.Task.TaskType;
@ -152,4 +153,34 @@ final class StateManagerUtil {
throw exception; throw exception;
} }
} }
/**
* Parse the task directory name (of the form topicGroupId_partition) and construct the TaskId with the
* optional namedTopology (may be null)
*
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
*/
static TaskId parseTaskDirectoryName(final String taskIdStr, final String namedTopology) {
final int index = taskIdStr.indexOf('_');
if (index <= 0 || index + 1 >= taskIdStr.length()) {
throw new TaskIdFormatException(taskIdStr);
}
try {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, index));
final int partition = Integer.parseInt(taskIdStr.substring(index + 1));
return new TaskId(topicGroupId, partition, namedTopology);
} catch (final Exception e) {
throw new TaskIdFormatException(taskIdStr);
}
}
/**
* @return The string representation of the subtopology and partition metadata, ie the task id string without
* the named topology, which defines the innermost task directory name of this task's state
*/
static String toTaskDirString(final TaskId taskId) {
return taskId.subtopology() + "_" + taskId.partition();
}
} }

View File

@ -1141,7 +1141,7 @@ public class StreamThread extends Thread {
final Set<TaskMetadata> activeTasksMetadata = new HashSet<>(); final Set<TaskMetadata> activeTasksMetadata = new HashSet<>();
for (final Map.Entry<TaskId, Task> task : activeTasks.entrySet()) { for (final Map.Entry<TaskId, Task> task : activeTasks.entrySet()) {
activeTasksMetadata.add(new TaskMetadata( activeTasksMetadata.add(new TaskMetadata(
task.getValue().id().toString(), task.getValue().id(),
task.getValue().inputPartitions(), task.getValue().inputPartitions(),
task.getValue().committedOffsets(), task.getValue().committedOffsets(),
task.getValue().highWaterMark(), task.getValue().highWaterMark(),
@ -1151,7 +1151,7 @@ public class StreamThread extends Thread {
final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>(); final Set<TaskMetadata> standbyTasksMetadata = new HashSet<>();
for (final Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) { for (final Map.Entry<TaskId, Task> task : standbyTasks.entrySet()) {
standbyTasksMetadata.add(new TaskMetadata( standbyTasksMetadata.add(new TaskMetadata(
task.getValue().id().toString(), task.getValue().id(),
task.getValue().inputPartitions(), task.getValue().inputPartitions(),
task.getValue().committedOffsets(), task.getValue().committedOffsets(),
task.getValue().highWaterMark(), task.getValue().highWaterMark(),

View File

@ -50,7 +50,6 @@ import org.apache.kafka.streams.processor.internals.assignment.ReferenceContaine
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTaskId;
import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -516,7 +515,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
} }
allAssignedPartitions.addAll(partitions); allAssignedPartitions.addAll(partitions);
tasksForTopicGroup.computeIfAbsent(new Subtopology(id.topicGroupId, NamedTaskId.namedTopology(id)), k -> new HashSet<>()).add(id); tasksForTopicGroup.computeIfAbsent(new Subtopology(id.subtopology(), id.namedTopology()), k -> new HashSet<>()).add(id);
} }
checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata); checkAllPartitions(allSourceTopics, partitionsForTask, allAssignedPartitions, fullMetadata);

View File

@ -64,6 +64,7 @@ import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.intersection; import static org.apache.kafka.common.utils.Utils.intersection;
import static org.apache.kafka.common.utils.Utils.union; import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.internals.StateManagerUtil.parseTaskDirectoryName;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA; import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.processor.internals.StreamThread.ProcessingMode.EXACTLY_ONCE_V2;
@ -683,7 +684,7 @@ public class TaskManager {
for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) { for (final File dir : stateDirectory.listNonEmptyTaskDirectories()) {
try { try {
final TaskId id = TaskId.parseTaskDirectoryName(dir.getName(), null); final TaskId id = parseTaskDirectoryName(dir.getName(), null);
if (stateDirectory.lock(id)) { if (stateDirectory.lock(id)) {
lockedTaskDirectories.add(id); lockedTaskDirectories.add(id);
if (!tasks.owned(id)) { if (!tasks.owned(id)) {

View File

@ -41,6 +41,8 @@ import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.readTaskIdFrom;
import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.writeTaskIdTo;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.UNKNOWN;
@ -207,14 +209,14 @@ public class AssignmentInfo {
// encode active tasks // encode active tasks
out.writeInt(activeTasks.size()); out.writeInt(activeTasks.size());
for (final TaskId id : activeTasks) { for (final TaskId id : activeTasks) {
id.writeTo(out, usedVersion); writeTaskIdTo(id, out, usedVersion);
} }
// encode standby tasks // encode standby tasks
out.writeInt(standbyTasks.size()); out.writeInt(standbyTasks.size());
for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) { for (final Map.Entry<TaskId, Set<TopicPartition>> entry : standbyTasks.entrySet()) {
final TaskId id = entry.getKey(); final TaskId id = entry.getKey();
id.writeTo(out, usedVersion); writeTaskIdTo(id, out, usedVersion);
final Set<TopicPartition> partitions = entry.getValue(); final Set<TopicPartition> partitions = entry.getValue();
writeTopicPartitions(out, partitions); writeTopicPartitions(out, partitions);
@ -383,7 +385,7 @@ public class AssignmentInfo {
final int count = in.readInt(); final int count = in.readInt();
assignmentInfo.activeTasks = new ArrayList<>(count); assignmentInfo.activeTasks = new ArrayList<>(count);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
assignmentInfo.activeTasks.add(TaskId.readFrom(in, assignmentInfo.usedVersion)); assignmentInfo.activeTasks.add(readTaskIdFrom(in, assignmentInfo.usedVersion));
} }
} }
@ -392,7 +394,7 @@ public class AssignmentInfo {
final int count = in.readInt(); final int count = in.readInt();
assignmentInfo.standbyTasks = new HashMap<>(count); assignmentInfo.standbyTasks = new HashMap<>(count);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
final TaskId id = TaskId.readFrom(in, assignmentInfo.usedVersion); final TaskId id = readTaskIdFrom(in, assignmentInfo.usedVersion);
assignmentInfo.standbyTasks.put(id, readTopicPartitions(in)); assignmentInfo.standbyTasks.put(id, readTopicPartitions(in));
} }
} }

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.MIN_NAMED_TOPOLOGY_VERSION;
/**
* Utility class for common assignment or consumer protocol utility methods such as de/serialization
*/
public class ConsumerProtocolUtils {
/**
* @throws IOException if cannot write to output stream
*/
public static void writeTaskIdTo(final TaskId taskId, final DataOutputStream out, final int version) throws IOException {
out.writeInt(taskId.subtopology());
out.writeInt(taskId.partition());
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
if (taskId.namedTopology() != null) {
out.writeInt(taskId.namedTopology().length());
out.writeChars(taskId.namedTopology());
} else {
out.writeInt(0);
}
}
}
/**
* @throws IOException if cannot read from input stream
*/
public static TaskId readTaskIdFrom(final DataInputStream in, final int version) throws IOException {
final int subtopology = in.readInt();
final int partition = in.readInt();
final String namedTopology;
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
final int numNamedTopologyChars = in.readInt();
final StringBuilder namedTopologyBuilder = new StringBuilder();
for (int i = 0; i < numNamedTopologyChars; ++i) {
namedTopologyBuilder.append(in.readChar());
}
namedTopology = namedTopologyBuilder.toString();
} else {
namedTopology = null;
}
return new TaskId(subtopology, partition, getNamedTopologyOrElseNull(namedTopology));
}
public static void writeTaskIdTo(final TaskId taskId, final ByteBuffer buf, final int version) {
buf.putInt(taskId.subtopology());
buf.putInt(taskId.partition());
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
if (taskId.namedTopology() != null) {
buf.putInt(taskId.namedTopology().length());
for (final char c : taskId.namedTopology().toCharArray()) {
buf.putChar(c);
}
} else {
buf.putInt(0);
}
}
}
public static TaskId readTaskIdFrom(final ByteBuffer buf, final int version) {
final int subtopology = buf.getInt();
final int partition = buf.getInt();
final String namedTopology;
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
final int numNamedTopologyChars = buf.getInt();
final StringBuilder namedTopologyBuilder = new StringBuilder();
for (int i = 0; i < numNamedTopologyChars; ++i) {
namedTopologyBuilder.append(buf.getChar());
}
namedTopology = namedTopologyBuilder.toString();
} else {
namedTopology = null;
}
return new TaskId(subtopology, partition, getNamedTopologyOrElseNull(namedTopology));
}
/**
* @return the namedTopology name, or null if the passed in namedTopology is null or the empty string
*/
private static String getNamedTopologyOrElseNull(final String namedTopology) {
return (namedTopology == null || namedTopology.length() == 0) ?
null :
namedTopology;
}
}

View File

@ -129,9 +129,9 @@ public class SubscriptionInfo {
final Map<Integer, List<SubscriptionInfoData.PartitionToOffsetSum>> topicGroupIdToPartitionOffsetSum = new HashMap<>(); final Map<Integer, List<SubscriptionInfoData.PartitionToOffsetSum>> topicGroupIdToPartitionOffsetSum = new HashMap<>();
for (final Map.Entry<TaskId, Long> taskEntry : taskOffsetSums.entrySet()) { for (final Map.Entry<TaskId, Long> taskEntry : taskOffsetSums.entrySet()) {
final TaskId task = taskEntry.getKey(); final TaskId task = taskEntry.getKey();
topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.topicGroupId, t -> new ArrayList<>()).add( topicGroupIdToPartitionOffsetSum.computeIfAbsent(task.subtopology(), t -> new ArrayList<>()).add(
new SubscriptionInfoData.PartitionToOffsetSum() new SubscriptionInfoData.PartitionToOffsetSum()
.setPartition(task.partition) .setPartition(task.partition())
.setOffsetSum(taskEntry.getValue())); .setOffsetSum(taskEntry.getValue()));
} }
@ -157,14 +157,14 @@ public class SubscriptionInfo {
data.setPrevTasks(prevTasks.stream().map(t -> { data.setPrevTasks(prevTasks.stream().map(t -> {
final SubscriptionInfoData.TaskId taskId = new SubscriptionInfoData.TaskId(); final SubscriptionInfoData.TaskId taskId = new SubscriptionInfoData.TaskId();
taskId.setTopicGroupId(t.topicGroupId); taskId.setTopicGroupId(t.subtopology());
taskId.setPartition(t.partition); taskId.setPartition(t.partition());
return taskId; return taskId;
}).collect(Collectors.toList())); }).collect(Collectors.toList()));
data.setStandbyTasks(standbyTasks.stream().map(t -> { data.setStandbyTasks(standbyTasks.stream().map(t -> {
final SubscriptionInfoData.TaskId taskId = new SubscriptionInfoData.TaskId(); final SubscriptionInfoData.TaskId taskId = new SubscriptionInfoData.TaskId();
taskId.setTopicGroupId(t.topicGroupId); taskId.setTopicGroupId(t.subtopology());
taskId.setPartition(t.partition); taskId.setPartition(t.partition());
return taskId; return taskId;
}).collect(Collectors.toList())); }).collect(Collectors.toList()));
} }

View File

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.namedtopology;
import org.apache.kafka.streams.processor.TaskId;
public class NamedTaskId extends TaskId {
public NamedTaskId(final int topicGroupId, final int partition, final String namedTopology) {
super(topicGroupId, partition, namedTopology);
if (namedTopology == null) {
throw new IllegalStateException("NamedTopology is required for a NamedTaskId");
}
}
public String namedTopology() {
return namedTopology;
}
public static String namedTopology(final TaskId taskId) {
if (taskId instanceof NamedTaskId) {
return ((NamedTaskId) taskId).namedTopology();
} else {
return null;
}
}
}

View File

@ -232,7 +232,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
updateBufferMetrics(); updateBufferMetrics();
open = true; open = true;
partition = context.taskId().partition; partition = context.taskId().partition();
} }
@Override @Override

View File

@ -54,7 +54,7 @@ public class StreamThreadStateStoreProvider {
if (storeQueryParams.partition() != null) { if (storeQueryParams.partition() != null) {
for (final Task task : tasks) { for (final Task task : tasks) {
if (task.id().partition == storeQueryParams.partition() && if (task.id().partition() == storeQueryParams.partition() &&
task.getStore(storeName) != null && task.getStore(storeName) != null &&
storeName.equals(task.getStore(storeName).name())) { storeName.equals(task.getStore(storeName).name())) {
final T typedStore = validateAndCastStores(task.getStore(storeName), queryableStoreType, storeName, task.id()); final T typedStore = validateAndCastStores(task.getStore(storeName), queryableStoreType, storeName, task.id());

View File

@ -499,7 +499,7 @@ public class ProcessorStateManagerTest {
final TopicPartition changelogPartition = stateMgr.registeredChangelogPartitionFor(persistentStoreName); final TopicPartition changelogPartition = stateMgr.registeredChangelogPartitionFor(persistentStoreName);
assertThat(changelogPartition.topic(), is(persistentStoreTopicName)); assertThat(changelogPartition.topic(), is(persistentStoreTopicName));
assertThat(changelogPartition.partition(), is(taskId.partition)); assertThat(changelogPartition.partition(), is(taskId.partition()));
} }
@Test @Test

View File

@ -1682,7 +1682,7 @@ public class StreamThreadTest {
final ThreadMetadata metadata = thread.threadMetadata(); final ThreadMetadata metadata = thread.threadMetadata();
assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState()); assertEquals(StreamThread.State.RUNNING.name(), metadata.threadState());
assertTrue(metadata.activeTasks().contains(new TaskMetadata(task1.toString(), Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty()))); assertTrue(metadata.activeTasks().contains(new TaskMetadata(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
assertTrue(metadata.standbyTasks().isEmpty()); assertTrue(metadata.standbyTasks().isEmpty());
assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED", assertTrue("#threadState() was: " + metadata.threadState() + "; expected either RUNNING, STARTING, PARTITIONS_REVOKED, PARTITIONS_ASSIGNED, or CREATED",
@ -1735,7 +1735,7 @@ public class StreamThreadTest {
final ThreadMetadata threadMetadata = thread.threadMetadata(); final ThreadMetadata threadMetadata = thread.threadMetadata();
assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState());
assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(task1.toString(), Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty()))); assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(task1, Utils.mkSet(t1p1), new HashMap<>(), new HashMap<>(), Optional.empty())));
assertTrue(threadMetadata.activeTasks().isEmpty()); assertTrue(threadMetadata.activeTasks().isEmpty());
thread.taskManager().shutdown(true); thread.taskManager().shutdown(true);

View File

@ -830,7 +830,7 @@ public class StreamsPartitionAssignorTest {
if (stateChangelogTopics.contains(changelogTopic)) { if (stateChangelogTopics.contains(changelogTopic)) {
for (final TaskId id : tasks) { for (final TaskId id : tasks) {
if (id.topicGroupId == entry.getKey().nodeGroupId) { if (id.subtopology() == entry.getKey().nodeGroupId) {
ids.add(id); ids.add(id);
} }
} }
@ -2109,7 +2109,7 @@ public class StreamsPartitionAssignorTest {
final Set<TopicPartition> partitions = entry.getValue(); final Set<TopicPartition> partitions = entry.getValue();
for (final TopicPartition partition : partitions) { for (final TopicPartition partition : partitions) {
// since default grouper, taskid.partition == partition.partition() // since default grouper, taskid.partition == partition.partition()
assertEquals(id.partition, partition.partition()); assertEquals(id.partition(), partition.partition());
standbyTopics.add(partition.topic()); standbyTopics.add(partition.topic());
} }

View File

@ -375,7 +375,7 @@ public final class AssignmentTestUtils {
final UUID client = entry.getKey(); final UUID client = entry.getKey();
final ClientState clientState = entry.getValue(); final ClientState clientState = entry.getValue();
for (final TaskId task : clientState.activeTasks()) { for (final TaskId task : clientState.activeTasks()) {
final int subtopology = task.topicGroupId; final int subtopology = task.subtopology();
subtopologyToClientsWithPartition subtopologyToClientsWithPartition
.computeIfAbsent(subtopology, initialClientCounts) .computeIfAbsent(subtopology, initialClientCounts)
.get(client) .get(client)

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.kafka.streams.processor.internals.assignment; package org.apache.kafka.streams.processor.internals.assignment;
import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.readTaskIdFrom;
import static org.apache.kafka.streams.processor.internals.assignment.ConsumerProtocolUtils.writeTaskIdTo;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.errors.TaskAssignmentException;
@ -168,7 +170,7 @@ public class LegacySubscriptionInfoSerde {
final int version) { final int version) {
buf.putInt(taskIds.size()); buf.putInt(taskIds.size());
for (final TaskId id : taskIds) { for (final TaskId id : taskIds) {
id.writeTo(buf, version); writeTaskIdTo(id, buf, version);
} }
} }
@ -233,7 +235,7 @@ public class LegacySubscriptionInfoSerde {
final Set<TaskId> prevTasks = new HashSet<>(); final Set<TaskId> prevTasks = new HashSet<>();
final int numPrevTasks = data.getInt(); final int numPrevTasks = data.getInt();
for (int i = 0; i < numPrevTasks; i++) { for (int i = 0; i < numPrevTasks; i++) {
prevTasks.add(TaskId.readFrom(data, version)); prevTasks.add(readTaskIdFrom(data, version));
} }
return prevTasks; return prevTasks;
} }

View File

@ -733,7 +733,7 @@ public class StickyTaskAssignorTest {
final List<Integer> topicGroupIds = new ArrayList<>(); final List<Integer> topicGroupIds = new ArrayList<>();
final Set<TaskId> activeTasks = clientStateEntry.getValue().activeTasks(); final Set<TaskId> activeTasks = clientStateEntry.getValue().activeTasks();
for (final TaskId activeTask : activeTasks) { for (final TaskId activeTask : activeTasks) {
topicGroupIds.add(activeTask.topicGroupId); topicGroupIds.add(activeTask.subtopology());
} }
Collections.sort(topicGroupIds); Collections.sort(topicGroupIds);
assertThat(topicGroupIds, equalTo(expectedTopicGroupIds)); assertThat(topicGroupIds, equalTo(expectedTopicGroupIds));

View File

@ -399,7 +399,7 @@ public class StreamThreadStateStoreProviderTest {
final TaskId taskId) { final TaskId taskId) {
final Metrics metrics = new Metrics(); final Metrics metrics = new Metrics();
final LogContext logContext = new LogContext("test-stream-task "); final LogContext logContext = new LogContext("test-stream-task ");
final Set<TopicPartition> partitions = Collections.singleton(new TopicPartition(topicName, taskId.partition)); final Set<TopicPartition> partitions = Collections.singleton(new TopicPartition(topicName, taskId.partition()));
final ProcessorStateManager stateManager = new ProcessorStateManager( final ProcessorStateManager stateManager = new ProcessorStateManager(
taskId, taskId,
Task.TaskType.ACTIVE, Task.TaskType.ACTIVE,

View File

@ -419,7 +419,7 @@ public class InternalMockProcessorContext
key, key,
value, value,
null, null,
taskId().partition, taskId().partition(),
timestamp, timestamp,
BYTES_KEY_SERIALIZER, BYTES_KEY_SERIALIZER,
BYTEARRAY_VALUE_SERIALIZER); BYTEARRAY_VALUE_SERIALIZER);