MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide (#10755)

Quick followup to KIP-740 to actually deprecate this constructor, and update the upgrade guide with what we changed in KIP-740. I also noticed the TaskId#parse method had been modified previously, and should be re-added to the public TaskId class. It had no tests, so now it does

Reviewers: Matthias J. Sax <mjsax@confluent.io>, Luke Chen <showuon@gmail.com>
This commit is contained in:
A. Sophie Blee-Goldman 2021-05-26 10:35:12 -07:00 committed by GitHub
parent 38e8391a77
commit bbe170af70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 0 deletions

View File

@ -117,6 +117,14 @@
<p> <p>
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>. We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
</p> </p>
<p>
The public <code>topicGroupId</code> and <code>partition</code> fields on TaskId have been deprecated and replaced with getters. Please migrate to using the new <code>TaskId.subtopology()</code>
(which replaces <code>topicGroupId</code>) and <code>TaskId.partition()</code> APIs instead. Also, the <code>TaskId#readFrom</code> and <code>TaskId#writeTo</code> methods have been deprecated
and will be removed, as they were never intended for public use. Finally, we have deprecated the <code>TaskMetadata.taskId()</code> method as well as the <code>TaskMetadata</code> constructor.
These have been replaced with APIs that better represent the task id as an actual <code>TaskId</code> object instead of a String. Please migrate to the new <code>TaskMetadata#getTaskId</code>
method. See <a href="https://cwiki.apache.org/confluence/x/vYTOCg">KIP-740</a> for more details.
</p>
<p> <p>
We removed the following deprecated APIs: We removed the following deprecated APIs:
</p> </p>

View File

@ -16,11 +16,14 @@
*/ */
package org.apache.kafka.streams.processor; package org.apache.kafka.streams.processor;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Objects; import java.util.Objects;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -80,6 +83,35 @@ public class TaskId implements Comparable<TaskId> {
return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition;
} }
/**
* @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId}
*/
public static TaskId parse(final String taskIdStr) {
final int firstIndex = taskIdStr.indexOf('_');
final int secondIndex = taskIdStr.indexOf('_', firstIndex + 1);
if (firstIndex <= 0 || firstIndex + 1 >= taskIdStr.length()) {
throw new TaskIdFormatException(taskIdStr);
}
try {
// If only one copy of '_' exists, there is no named topology in the string
if (secondIndex < 0) {
final int topicGroupId = Integer.parseInt(taskIdStr.substring(0, firstIndex));
final int partition = Integer.parseInt(taskIdStr.substring(firstIndex + 1));
return new TaskId(topicGroupId, partition);
} else {
final String namedTopology = taskIdStr.substring(0, firstIndex);
final int topicGroupId = Integer.parseInt(taskIdStr.substring(firstIndex + 1, secondIndex));
final int partition = Integer.parseInt(taskIdStr.substring(secondIndex + 1));
return new TaskId(topicGroupId, partition, namedTopology);
}
} catch (final Exception e) {
throw new TaskIdFormatException(taskIdStr);
}
}
/** /**
* @throws IOException if cannot write to output stream * @throws IOException if cannot write to output stream
* @deprecated since 3.0, for internal use, will be removed * @deprecated since 3.0, for internal use, will be removed

View File

@ -40,6 +40,19 @@ public class TaskMetadata {
private final Optional<Long> timeCurrentIdlingStarted; private final Optional<Long> timeCurrentIdlingStarted;
/**
* @deprecated since 3.0, not intended for public use
*/
@Deprecated
public TaskMetadata(final String taskId,
final Set<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> committedOffsets,
final Map<TopicPartition, Long> endOffsets,
final Optional<Long> timeCurrentIdlingStarted) {
this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted);
}
// For internal use -- not a public API
public TaskMetadata(final TaskId taskId, public TaskMetadata(final TaskId taskId,
final Set<TopicPartition> topicPartitions, final Set<TopicPartition> topicPartitions,
final Map<TopicPartition, Long> committedOffsets, final Map<TopicPartition, Long> committedOffsets,

View File

@ -148,6 +148,18 @@ public class StateDirectoryTest {
} }
} }
@Test
public void shouldParseUnnamedTaskId() {
final TaskId task = new TaskId(1, 0);
assertThat(TaskId.parse(task.toString()), equalTo(task));
}
@Test
public void shouldParseNamedTaskId() {
final TaskId task = new TaskId(1, 0, "namedTopology");
assertThat(TaskId.parse(task.toString()), equalTo(task));
}
@Test @Test
public void shouldCreateTaskStateDirectory() { public void shouldCreateTaskStateDirectory() {
final TaskId taskId = new TaskId(0, 0); final TaskId taskId = new TaskId(0, 0);