KAFKA-14133: Migrate topology builder mock in TaskManagerTest to mockito (#13529)

1. Migrates topology builder mock in TaskManagerTest to mockito.

2. Replaces the unit test to verify if subscribed partitions are added
to topology metadata.

3. Modifies signatures of methods for adding subscribed partitions to
topology metadata to use sets instead of lists. This makes the
intent of the methods clearer and makes the tests more portable.

Reviewers: Christo Lolov <lolovc@amazon.com>, Matthias J. Sax <mjsax@apache.org>
This commit is contained in:
Bruno Cadonna 2023-05-02 14:00:34 +02:00 committed by GitHub
parent 21af1918ea
commit 141c76a2c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 73 deletions

View File

@ -2166,7 +2166,7 @@ public class InternalTopologyBuilder {
return !subscriptionUpdates.isEmpty();
}
synchronized void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
synchronized void addSubscribedTopicsFromAssignment(final Set<TopicPartition> partitions, final String logPrefix) {
if (usesPatternSubscription()) {
final Set<String> assignedTopics = new HashSet<>();
for (final TopicPartition topicPartition : partitions) {

View File

@ -314,7 +314,7 @@ public class TaskManager {
activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
topologyMetadata.addSubscribedTopicsFromAssignment(
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
activeTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
logPrefix
);

View File

@ -584,7 +584,7 @@ public class TopologyMetadata {
applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
}
void addSubscribedTopicsFromAssignment(final List<TopicPartition> partitions, final String logPrefix) {
void addSubscribedTopicsFromAssignment(final Set<TopicPartition> partitions, final String logPrefix) {
applyToEachBuilder(b -> b.addSubscribedTopicsFromAssignment(partitions, logPrefix));
}

View File

@ -66,6 +66,9 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import java.io.File;
import java.util.Arrays;
@ -99,7 +102,6 @@ import static org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNA
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
@ -178,7 +180,7 @@ public class TaskManagerTest {
final java.util.function.Consumer<Set<TopicPartition>> noOpResetter = partitions -> { };
@Mock(type = MockType.STRICT)
@org.mockito.Mock
private InternalTopologyBuilder topologyBuilder;
@Mock(type = MockType.DEFAULT)
private StateDirectory stateDirectory;
@ -201,6 +203,9 @@ public class TaskManagerTest {
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();
@Rule
public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);
@Before
public void setUp() {
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, false);
@ -228,9 +233,6 @@ public class TaskManagerTest {
stateUpdaterEnabled ? stateUpdater : null
);
taskManager.setMainConsumer(consumer);
reset(topologyBuilder);
expect(topologyBuilder.hasNamedTopology()).andStubReturn(false);
expect(topologyBuilder.nodeToSourceTopics()).andStubReturn(emptyMap());
return taskManager;
}
@ -905,7 +907,6 @@ public class TaskManagerTest {
when(tasks.removePendingTaskToUpdateInputPartitions(task00.id())).thenReturn(taskId02Partitions);
when(tasks.removePendingTaskToUpdateInputPartitions(task01.id())).thenReturn(taskId03Partitions);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
replay(topologyBuilder);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@ -970,6 +971,7 @@ public class TaskManagerTest {
expectLastCall().anyTimes();
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToCloseClean(taskToClose.id())).thenReturn(true);
when(tasks.removePendingTaskToCloseClean(argThat(taskId -> !taskId.equals(taskToClose.id())))).thenReturn(false);
when(tasks.removePendingTaskToRecycle(taskToRecycle0.id())).thenReturn(taskId00Partitions);
when(tasks.removePendingTaskToRecycle(taskToRecycle1.id())).thenReturn(taskId01Partitions);
when(tasks.removePendingTaskToRecycle(
@ -978,11 +980,11 @@ public class TaskManagerTest {
when(tasks.removePendingTaskToUpdateInputPartitions(taskToUpdateInputPartitions.id())).thenReturn(taskId04Partitions);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
taskManager.setMainConsumer(consumer);
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
replay(activeTaskCreator, standbyTaskCreator, consumer);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter -> { });
verify(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer);
verify(activeTaskCreator, standbyTaskCreator, consumer);
Mockito.verify(convertedTask0).initializeIfNeeded();
Mockito.verify(convertedTask1).initializeIfNeeded();
Mockito.verify(stateUpdater).add(convertedTask0);
@ -1324,7 +1326,7 @@ public class TaskManagerTest {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
consumer.resume(statefulTask.inputPartitions());
replay(consumer, topologyBuilder);
replay(consumer);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@ -1404,7 +1406,7 @@ public class TaskManagerTest {
taskToCloseDirty,
taskToUpdateInputPartitions
));
replay(standbyTaskCreator, topologyBuilder);
replay(standbyTaskCreator);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@ -1528,21 +1530,24 @@ public class TaskManagerTest {
}
@Test
public void shouldIdempotentlyUpdateSubscriptionFromActiveAssignment() {
final TopicPartition newTopicPartition = new TopicPartition("topic2", 1);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(mkEntry(taskId01, mkSet(t1p1, newTopicPartition)));
public void shouldAddSubscribedTopicsFromAssignmentToTopologyMetadata() {
final Map<TaskId, Set<TopicPartition>> activeTasksAssignment = mkMap(
mkEntry(taskId01, mkSet(t1p1)),
mkEntry(taskId02, mkSet(t1p2, t2p2))
);
final Map<TaskId, Set<TopicPartition>> standbyTasksAssignment = mkMap(
mkEntry(taskId03, mkSet(t1p3)),
mkEntry(taskId04, mkSet(t1p4))
);
expect(activeTaskCreator.createTasks(anyObject(), eq(activeTasksAssignment))).andStubReturn(emptyList());
expect(standbyTaskCreator.createTasks(eq(standbyTasksAssignment))).andStubReturn(Collections.emptySet());
replay(activeTaskCreator, standbyTaskCreator);
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(emptyList());
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
taskManager.handleAssignment(activeTasksAssignment, standbyTasksAssignment);
topologyBuilder.addSubscribedTopicsFromAssignment(eq(asList(t1p1, newTopicPartition)), anyString());
expectLastCall();
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder);
taskManager.handleAssignment(assignment, emptyMap());
verify(activeTaskCreator, topologyBuilder);
Mockito.verify(topologyBuilder).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p1, t1p2, t2p2)), Mockito.anyString());
Mockito.verify(topologyBuilder, never()).addSubscribedTopicsFromAssignment(Mockito.eq(mkSet(t1p3, t1p4)), Mockito.anyString());
verify(activeTaskCreator, standbyTaskCreator);
}
@Test
@ -1827,8 +1832,6 @@ public class TaskManagerTest {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expectLastCall();
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
// `handleRevocation`
consumer.commitSync(offsets);
@ -1838,7 +1841,7 @@ public class TaskManagerTest {
consumer.commitSync(offsets);
expectLastCall();
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -1869,10 +1872,8 @@ public class TaskManagerTest {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expectLastCall();
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
taskManager.handleRevocation(taskId00Partitions);
@ -1899,8 +1900,6 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer, changeLogReader);
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
makeTaskFolders(taskId00.toString(), taskId01.toString());
expectLockObtainedFor(taskId00, taskId01);
@ -1917,7 +1916,7 @@ public class TaskManagerTest {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expectLastCall();
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -1963,8 +1962,6 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer, changeLogReader);
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(emptyList());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
// `handleAssignment`
consumer.commitSync(offsets);
@ -1972,7 +1969,7 @@ public class TaskManagerTest {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
expectLastCall().andThrow(new RuntimeException("KABOOM!"));
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -2015,10 +2012,8 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer, changeLogReader);
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expect(consumer.assignment()).andReturn(taskId00Partitions);
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@ -2055,10 +2050,8 @@ public class TaskManagerTest {
expectRestoreToBeCompleted(consumer, changeLogReader);
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expect(consumer.assignment()).andReturn(taskId00Partitions);
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@ -2093,14 +2086,12 @@ public class TaskManagerTest {
.andStubReturn(asList(corruptedTask, nonCorruptedTask));
expect(standbyTaskCreator.createTasks(anyObject()))
.andStubReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
expect(consumer.assignment()).andReturn(taskId00Partitions);
// check that we should not commit empty map either
consumer.commitSync(eq(emptyMap()));
expectLastCall().andStubThrow(new AssertionError("should not invoke commitSync when offset map is empty"));
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true));
@ -2137,10 +2128,8 @@ public class TaskManagerTest {
.andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
expect(standbyTaskCreator.createTasks(anyObject()))
.andStubReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expect(consumer.assignment()).andReturn(taskId00Partitions);
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignment, emptyMap());
@ -2173,12 +2162,10 @@ public class TaskManagerTest {
// handleAssignment
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
expect(activeTaskCreator.createTasks(anyObject(), eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive));
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId01Assignment, taskId00Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -2216,16 +2203,12 @@ public class TaskManagerTest {
assignment.putAll(taskId01Assignment);
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
topologyBuilder.addSubscribedTopicsFromMetadata(eq(singleton(topic1)), anyObject());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader, stateDirectory, stateManager);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, stateDirectory, stateManager);
uncorruptedActive.setCommittableOffsetsAndMetadata(offsets);
@ -2272,8 +2255,6 @@ public class TaskManagerTest {
assignment.putAll(taskId01Assignment);
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
@ -2282,7 +2263,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -2350,8 +2331,6 @@ public class TaskManagerTest {
assignment.putAll(taskId01Assignment);
expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
expectLastCall().anyTimes();
expectRestoreToBeCompleted(consumer, changeLogReader);
@ -2362,7 +2341,7 @@ public class TaskManagerTest {
expect(consumer.assignment()).andStubReturn(union(HashSet::new, taskId00Partitions, taskId01Partitions));
replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader, stateManager);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, stateManager);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -2563,9 +2542,7 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andReturn(singletonList(task01)).anyTimes();
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(eq(asList(t1p0)), anyString());
expectLastCall().anyTimes();
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, topologyBuilder);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(taskId00Assignment, taskId01Assignment);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
@ -2895,10 +2872,8 @@ public class TaskManagerTest {
expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
expect(standbyTaskCreator.createTasks(eq(assignmentStandby))).andReturn(singletonList(task10));
expect(standbyTaskCreator.createTasks(eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
topologyBuilder.addSubscribedTopicsFromAssignment(eq(asList(t1p0)), anyString());
expectLastCall().anyTimes();
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader, topologyBuilder);
replay(activeTaskCreator, standbyTaskCreator, consumer, changeLogReader);
taskManager.handleAssignment(assignmentActive, assignmentStandby);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));

View File

@ -32,6 +32,7 @@ import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.mockito.quality.Strictness;
import java.io.Closeable;
import java.io.IOException;
@ -54,6 +55,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
public final class StreamsTestUtils {
private StreamsTestUtils() {}
@ -267,7 +269,7 @@ public final class StreamsTestUtils {
}
public static TaskBuilder<StreamTask> statelessTask(final TaskId taskId) {
final StreamTask task = mock(StreamTask.class);
final StreamTask task = mock(StreamTask.class, withSettings().strictness(Strictness.LENIENT));
when(task.changelogPartitions()).thenReturn(Collections.emptySet());
when(task.isActive()).thenReturn(true);
when(task.id()).thenReturn(taskId);
@ -276,7 +278,7 @@ public final class StreamsTestUtils {
public static TaskBuilder<StreamTask> statefulTask(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StreamTask task = mock(StreamTask.class);
final StreamTask task = mock(StreamTask.class, withSettings().strictness(Strictness.LENIENT));
when(task.isActive()).thenReturn(true);
setupStatefulTask(task, taskId, changelogPartitions);
return new TaskBuilder<>(task);
@ -284,7 +286,7 @@ public final class StreamsTestUtils {
public static TaskBuilder<StandbyTask> standbyTask(final TaskId taskId,
final Set<TopicPartition> changelogPartitions) {
final StandbyTask task = mock(StandbyTask.class);
final StandbyTask task = mock(StandbyTask.class, withSettings().strictness(Strictness.LENIENT));
when(task.isActive()).thenReturn(false);
setupStatefulTask(task, taskId, changelogPartitions);
return new TaskBuilder<>(task);