From bbe170af701609180387ad4abbfaa2712936266d Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 26 May 2021 10:35:12 -0700 Subject: [PATCH] MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide (#10755) Quick followup to KIP-740 to actually deprecate this constructor, and update the upgrade guide with what we changed in KIP-740. I also noticed the TaskId#parse method had been modified previously, and should be re-added to the public TaskId class. It had no tests, so now it does Reviewers: Matthias J. Sax , Luke Chen --- docs/streams/upgrade-guide.html | 8 +++++ .../kafka/streams/processor/TaskId.java | 32 +++++++++++++++++++ .../kafka/streams/processor/TaskMetadata.java | 13 ++++++++ .../internals/StateDirectoryTest.java | 12 +++++++ 4 files changed, 65 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 3ae4573d207..892d025be44 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -117,6 +117,14 @@

We removed the default implementation of RocksDBConfigSetter#close().

+ +

+ The public topicGroupId and partition fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new TaskId.subtopology() + (which replaces topicGroupId) and TaskId.partition() APIs instead. Also, the TaskId#readFrom and TaskId#writeTo methods have been deprecated + and will be removed, as they were never intended for public use. Finally, we have deprecated the TaskMetadata.taskId() method as well as the TaskMetadata constructor. + These have been replaced with APIs that better represent the task id as an actual TaskId object instead of a String. Please migrate to the new TaskMetadata#getTaskId + method. See KIP-740 for more details. +

We removed the following deprecated APIs:

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java index d53630225cc..f4d8349eadd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java @@ -16,11 +16,14 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.streams.errors.TaskIdFormatException; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +83,35 @@ public class TaskId implements Comparable { return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; } + /** + * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + */ + public static TaskId parse(final String taskIdStr) { + final int firstIndex = taskIdStr.indexOf('_'); + final int secondIndex = taskIdStr.indexOf('_', firstIndex + 1); + if (firstIndex <= 0 || firstIndex + 1 >= taskIdStr.length()) { + throw new TaskIdFormatException(taskIdStr); + } + + try { + // If only one copy of '_' exists, there is no named topology in the string + if (secondIndex < 0) { + final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, firstIndex)); + final int partition = Integer.parseInt(taskIdStr.substring(firstIndex + 1)); + + return new TaskId(topicGroupId, partition); + } else { + final String namedTopology = taskIdStr.substring(0, firstIndex); + final int topicGroupId = Integer.parseInt(taskIdStr.substring(firstIndex + 1, secondIndex)); + final int partition = Integer.parseInt(taskIdStr.substring(secondIndex + 1)); + + return new TaskId(topicGroupId, partition, namedTopology); + } + } catch (final Exception e) { + throw new TaskIdFormatException(taskIdStr); + } + } + /** * @throws IOException if cannot write to output stream * @deprecated since 3.0, for internal use, will be removed diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java index a35b7281118..f5a5a695bfd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java @@ -40,6 +40,19 @@ public class TaskMetadata { private final Optional timeCurrentIdlingStarted; + /** + * @deprecated since 3.0, not intended for public use + */ + @Deprecated + public TaskMetadata(final String taskId, + final Set topicPartitions, + final Map committedOffsets, + final Map endOffsets, + final Optional timeCurrentIdlingStarted) { + this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted); + } + + // For internal use -- not a public API public TaskMetadata(final TaskId taskId, final Set topicPartitions, final Map committedOffsets, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index b17169c64f1..fb1a486b573 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -148,6 +148,18 @@ public class StateDirectoryTest { } } + @Test + public void shouldParseUnnamedTaskId() { + final TaskId task = new TaskId(1, 0); + assertThat(TaskId.parse(task.toString()), equalTo(task)); + } + + @Test + public void shouldParseNamedTaskId() { + final TaskId task = new TaskId(1, 0, "namedTopology"); + assertThat(TaskId.parse(task.toString()), equalTo(task)); + } + @Test public void shouldCreateTaskStateDirectory() { final TaskId taskId = new TaskId(0, 0);