mirror of https://github.com/apache/kafka.git
cleanup unnecessary comments
This commit is contained in:
parent
9dd75df09e
commit
9daf9cdb0f
|
@ -4215,39 +4215,30 @@ public class TaskManagerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldPunctuateActiveTasks() {
|
public void shouldPunctuateActiveTasks() {
|
||||||
// Create a StreamTask using the builder pattern for StateUpdater compatibility
|
|
||||||
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
|
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
|
||||||
.withInputPartitions(taskId00Partitions)
|
.withInputPartitions(taskId00Partitions)
|
||||||
.inState(State.RUNNING) // Task has completed restoration and is RUNNING
|
.inState(State.RUNNING)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// Mock punctuation methods to return true (indicating punctuation occurred)
|
|
||||||
when(task00.maybePunctuateStreamTime()).thenReturn(true);
|
when(task00.maybePunctuateStreamTime()).thenReturn(true);
|
||||||
when(task00.maybePunctuateSystemTime()).thenReturn(true);
|
when(task00.maybePunctuateSystemTime()).thenReturn(true);
|
||||||
|
|
||||||
// Mock TasksRegistry to include the running task as an active task
|
|
||||||
final TasksRegistry tasks = mock(TasksRegistry.class);
|
final TasksRegistry tasks = mock(TasksRegistry.class);
|
||||||
when(tasks.activeTasks()).thenReturn(Set.of(task00));
|
when(tasks.activeTasks()).thenReturn(Set.of(task00));
|
||||||
|
|
||||||
// Mock StateUpdater to indicate no tasks are currently being restored
|
|
||||||
// This simulates that all restoration has completed
|
|
||||||
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
|
when(stateUpdater.restoresActiveTasks()).thenReturn(false);
|
||||||
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(false);
|
when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(false);
|
||||||
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of());
|
when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of());
|
||||||
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(List.of());
|
when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(List.of());
|
||||||
|
|
||||||
// Set up TaskManager with StateUpdater enabled
|
|
||||||
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
|
||||||
|
|
||||||
// Call checkStateUpdater to ensure coordination with StateUpdater is done
|
|
||||||
// This simulates the normal flow where StreamThread calls checkStateUpdater
|
|
||||||
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
|
assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
|
||||||
|
|
||||||
// Verify punctuation calls both stream and system time punctuators
|
|
||||||
// one for stream and one for system time
|
// one for stream and one for system time
|
||||||
assertThat(taskManager.punctuate(), equalTo(2));
|
assertThat(taskManager.punctuate(), equalTo(2));
|
||||||
|
|
||||||
// Verify that both punctuation methods were called
|
|
||||||
verify(task00).maybePunctuateStreamTime();
|
verify(task00).maybePunctuateStreamTime();
|
||||||
verify(task00).maybePunctuateSystemTime();
|
verify(task00).maybePunctuateSystemTime();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue