KAFKA-16331: Remove task producers from Kafka Streams (#17344)

With EOSv1 removal, we don't have producer-per-task any longer,
and thus can remove the corresponding code which handles task producers.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-10-08 15:36:05 -07:00 committed by GitHub
parent 435e9851d2
commit 8d186bfb4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 62 additions and 283 deletions

View File

@ -44,7 +44,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
import static org.apache.kafka.streams.processor.internals.ClientUtils.producerClientId;
@ -63,9 +62,7 @@ class ActiveTaskCreator {
private final UUID processId;
private final Logger log;
private final Sensor createTaskSensor;
private final StreamsProducer threadProducer;
// TODO remove `taskProducers`
private final Map<TaskId, StreamsProducer> taskProducers = Collections.emptyMap();
private final StreamsProducer streamsProducer;
private final ProcessingMode processingMode;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
@ -105,7 +102,7 @@ class ActiveTaskCreator {
final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
final LogContext logContext = new LogContext(threadIdPrefix);
threadProducer = new StreamsProducer(
streamsProducer = new StreamsProducer(
processingMode,
producer(),
logContext,
@ -124,27 +121,12 @@ class ActiveTaskCreator {
return clientSupplier.getProducer(producerConfig);
}
public void reInitializeThreadProducer() {
threadProducer.resetProducer(producer());
public void reInitializeProducer() {
streamsProducer.resetProducer(producer());
}
StreamsProducer streamsProducerForTask(final TaskId taskId) {
if (processingMode != EXACTLY_ONCE_ALPHA) {
throw new IllegalStateException("Expected EXACTLY_ONCE to be enabled, but the processing mode was " + processingMode);
}
final StreamsProducer taskProducer = taskProducers.get(taskId);
if (taskProducer == null) {
throw new IllegalStateException("Unknown TaskId: " + taskId);
}
return taskProducer;
}
StreamsProducer threadProducer() {
if (processingMode == EXACTLY_ONCE_ALPHA) {
throw new IllegalStateException("Expected AT_LEAST_ONCE or EXACTLY_ONCE_V2 to be enabled, but the processing mode was " + processingMode);
}
return threadProducer;
StreamsProducer streamsProducer() {
return streamsProducer;
}
// TODO: convert to StreamTask when we remove TaskManager#StateMachineTask with mocks
@ -198,7 +180,7 @@ class ActiveTaskCreator {
return new RecordCollectorImpl(
logContext,
taskId,
this.threadProducer,
streamsProducer,
applicationConfig.productionExceptionHandler(),
streamsMetrics,
topology
@ -274,28 +256,16 @@ class ActiveTaskCreator {
return task;
}
// TODO: rename and revisit test
void closeThreadProducerIfNeeded() {
void close() {
try {
threadProducer.close();
streamsProducer.close();
} catch (final RuntimeException e) {
throw new StreamsException("Thread producer encounter error trying to close.", e);
}
}
void closeAndRemoveTaskProducerIfNeeded(final TaskId id) {
final StreamsProducer taskProducer = taskProducers.remove(id);
if (taskProducer != null) {
try {
taskProducer.close();
} catch (final RuntimeException e) {
throw new StreamsException("[" + id + "] task producer encounter error trying to close.", e, id);
}
}
}
Map<MetricName, Metric> producerMetrics() {
return ClientUtils.producerMetrics(Collections.singleton(threadProducer));
return ClientUtils.producerMetrics(Collections.singleton(streamsProducer));
}
String producerClientIds() {
@ -309,6 +279,6 @@ class ActiveTaskCreator {
}
public double totalProducerBlockedTime() {
return threadProducer.totalBlockedTime();
return streamsProducer.totalBlockedTime();
}
}

View File

@ -812,7 +812,7 @@ public class StreamThread extends Thread implements ProcessingThread {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
threadProducerInstanceIdFuture.complete(
taskManager.threadProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO)
);
} catch (final IllegalStateException disabledError) {
// if telemetry is disabled on a client, we swallow the error,

View File

@ -38,8 +38,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
/**
@ -177,30 +175,13 @@ public class TaskExecutor {
final Set<TaskId> corruptedTasks = new HashSet<>();
if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
for (final Task task : taskManager.activeRunningTaskIterable()) {
final Map<TopicPartition, OffsetAndMetadata> taskOffsetsToCommit = offsetsPerTask.getOrDefault(task, emptyMap());
if (!taskOffsetsToCommit.isEmpty() || taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
try {
taskManager.streamsProducerForTask(task.id())
.commitTransaction(taskOffsetsToCommit, taskManager.consumerGroupMetadata());
updateTaskCommitMetadata(taskOffsetsToCommit);
} catch (final TimeoutException timeoutException) {
log.error(
String.format("Committing task %s failed.", task.id()),
timeoutException
);
corruptedTasks.add(task.id());
}
}
}
} else if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
if (!offsetsPerTask.isEmpty() || taskManager.threadProducer().transactionInFlight()) {
if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
if (!offsetsPerTask.isEmpty() || taskManager.streamsProducer().transactionInFlight()) {
final Map<TopicPartition, OffsetAndMetadata> allOffsets = offsetsPerTask.values().stream()
.flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
try {
taskManager.threadProducer().commitTransaction(allOffsets, taskManager.consumerGroupMetadata());
taskManager.streamsProducer().commitTransaction(allOffsets, taskManager.consumerGroupMetadata());
updateTaskCommitMetadata(allOffsets);
} catch (final TimeoutException timeoutException) {
log.error(

View File

@ -105,7 +105,7 @@ public class TaskManager {
// includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance
private final Set<TaskId> lockedTaskDirectories = new HashSet<>();
private Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new HashMap<>();
private final Map<TaskId, BackoffRecord> taskIdToBackoffRecord = new HashMap<>();
private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
@ -173,12 +173,8 @@ public class TaskManager {
mainConsumer.commitSync(offsets);
}
StreamsProducer streamsProducerForTask(final TaskId taskId) {
return activeTaskCreator.streamsProducerForTask(taskId);
}
StreamsProducer threadProducer() {
return activeTaskCreator.threadProducer();
StreamsProducer streamsProducer() {
return activeTaskCreator.streamsProducer();
}
boolean rebalanceInProgress() {
@ -829,9 +825,7 @@ public class TaskManager {
}
private StandbyTask convertActiveToStandby(final StreamTask activeTask, final Set<TopicPartition> partitions) {
final StandbyTask standbyTask = standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
return standbyTask;
return standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
}
private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set<TopicPartition> partitions) {
@ -956,9 +950,6 @@ public class TaskManager {
try {
task.suspend();
task.closeClean();
if (task.isActive()) {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
}
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
"Attempting to close remaining tasks before re-throwing:", task.id());
@ -1228,7 +1219,7 @@ public class TaskManager {
removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit();
if (processingMode == EXACTLY_ONCE_V2) {
activeTaskCreator.reInitializeThreadProducer();
activeTaskCreator.reInitializeProducer();
}
}
@ -1433,10 +1424,6 @@ public class TaskManager {
if (removeFromTasksRegistry) {
tasks.removeTask(task);
}
if (task.isActive()) {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
}
} catch (final RuntimeException swallow) {
log.error("Error removing dirty task {}: {}", task.id(), swallow.getMessage());
}
@ -1445,9 +1432,6 @@ public class TaskManager {
private void closeTaskClean(final Task task) {
task.closeClean();
tasks.removeTask(task);
if (task.isActive()) {
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
}
}
void shutdown(final boolean clean) {
@ -1469,7 +1453,7 @@ public class TaskManager {
executeAndMaybeSwallow(
clean,
activeTaskCreator::closeThreadProducerIfNeeded,
activeTaskCreator::close,
e -> firstException.compareAndSet(null, e),
e -> log.warn("Ignoring an exception while closing thread producer.", e)
);

View File

@ -87,7 +87,7 @@ public class ActiveTaskCreatorTest {
@Test
public void shouldConstructProducerMetricsWithEosDisabled() {
shouldConstructThreadProducerMetric();
shouldConstructStreamsProducerMetric();
}
@Test
@ -100,26 +100,16 @@ public class ActiveTaskCreatorTest {
}
@Test
public void shouldCloseThreadProducerIfEosDisabled() {
public void shouldCloseIfEosDisabled() {
createTasks();
activeTaskCreator.closeThreadProducerIfNeeded();
activeTaskCreator.close();
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}
@Test
public void shouldNoOpCloseTaskProducerIfEosDisabled() {
createTasks();
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
}
@Test
public void shouldReturnBlockedTimeWhenThreadProducer() {
public void shouldReturnBlockedTimeWhenStreamsProducer() {
final double blockedTime = 123.0;
createTasks();
final MockProducer<?, ?> producer = mockClientSupplier.producers.get(0);
@ -131,35 +121,23 @@ public class ActiveTaskCreatorTest {
// error handling
@Test
public void shouldFailOnStreamsProducerPerTaskIfEosDisabled() {
public void shouldReturnStreamsProducerIfAtLeastOnceIsEnabled() {
createTasks();
final IllegalStateException thrown = assertThrows(
IllegalStateException.class,
() -> activeTaskCreator.streamsProducerForTask(null)
);
assertThat(thrown.getMessage(), is("Expected EXACTLY_ONCE to be enabled, but the processing mode was AT_LEAST_ONCE"));
}
@Test
public void shouldReturnThreadProducerIfAtLeastOnceIsEnabled() {
createTasks();
final StreamsProducer threadProducer = activeTaskCreator.threadProducer();
final StreamsProducer threadProducer = activeTaskCreator.streamsProducer();
assertThat(mockClientSupplier.producers.size(), is(1));
assertThat(threadProducer.kafkaProducer(), is(mockClientSupplier.producers.get(0)));
}
@Test
public void shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosDisabled() {
public void shouldThrowStreamsExceptionOnErrorCloseIfEosDisabled() {
createTasks();
mockClientSupplier.producers.get(0).closeException = new RuntimeException("KABOOM!");
final StreamsException thrown = assertThrows(
StreamsException.class,
activeTaskCreator::closeThreadProducerIfNeeded
activeTaskCreator::close
);
assertThat(thrown.getMessage(), is("Thread producer encounter error trying to close."));
@ -173,13 +151,13 @@ public class ActiveTaskCreatorTest {
// functional test
@Test
public void shouldReturnThreadProducerIfEosV2Enabled() {
public void shouldReturnStreamsProducerIfEosV2Enabled() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
final StreamsProducer threadProducer = activeTaskCreator.threadProducer();
final StreamsProducer threadProducer = activeTaskCreator.streamsProducer();
assertThat(mockClientSupplier.producers.size(), is(1));
assertThat(threadProducer.kafkaProducer(), is(mockClientSupplier.producers.get(0)));
@ -190,7 +168,7 @@ public class ActiveTaskCreatorTest {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
shouldConstructThreadProducerMetric();
shouldConstructStreamsProducerMetric();
}
@Test
@ -205,48 +183,20 @@ public class ActiveTaskCreatorTest {
}
@Test
public void shouldCloseThreadProducerIfEosV2Enabled() {
public void shouldCloseIfEosV2Enabled() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
activeTaskCreator.closeThreadProducerIfNeeded();
activeTaskCreator.close();
assertThat(mockClientSupplier.producers.get(0).closed(), is(true));
}
@Test
public void shouldNoOpCloseTaskProducerIfEosV2Enabled() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
assertThat(mockClientSupplier.producers.get(0).closed(), is(false));
}
// error handling
@Test
public void shouldFailOnStreamsProducerPerTaskIfEosV2Enabled() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
final IllegalStateException thrown = assertThrows(
IllegalStateException.class,
() -> activeTaskCreator.streamsProducerForTask(null)
);
assertThat(thrown.getMessage(), is("Expected EXACTLY_ONCE to be enabled, but the processing mode was EXACTLY_ONCE_V2"));
}
@Test
public void shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosV2Enabled() {
public void shouldThrowStreamsExceptionOnErrorCloseIfEosV2Enabled() {
properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
mockClientSupplier.setApplicationIdForProducer("appId");
createTasks();
@ -254,14 +204,14 @@ public class ActiveTaskCreatorTest {
final StreamsException thrown = assertThrows(
StreamsException.class,
activeTaskCreator::closeThreadProducerIfNeeded
activeTaskCreator::close
);
assertThat(thrown.getMessage(), is("Thread producer encounter error trying to close."));
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
}
private void shouldConstructThreadProducerMetric() {
private void shouldConstructStreamsProducerMetric() {
createTasks();
final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>());

View File

@ -51,7 +51,7 @@ public class TaskExecutorTest {
final TaskExecutionMetadata metadata = mock(TaskExecutionMetadata.class);
final StreamsProducer producer = mock(StreamsProducer.class);
when(metadata.processingMode()).thenReturn(EXACTLY_ONCE_V2);
when(taskManager.threadProducer()).thenReturn(producer);
when(taskManager.streamsProducer()).thenReturn(producer);
when(producer.transactionInFlight()).thenReturn(true);
final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, metadata, new LogContext());

View File

@ -442,7 +442,6 @@ public class TaskManagerTest {
verify(activeTaskToClose).suspend();
verify(activeTaskToClose).closeClean();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@ -464,7 +463,6 @@ public class TaskManagerTest {
verify(activeTaskToClose).prepareCommit();
verify(activeTaskToClose).suspend();
verify(activeTaskToClose).closeDirty();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}
@ -898,7 +896,6 @@ public class TaskManagerTest {
verify(tasks).addPendingTasksToInit(Collections.singleton(recycledActiveTask));
verify(activeTaskToClose).suspend();
verify(activeTaskToClose).closeClean();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
}
@ -985,7 +982,7 @@ public class TaskManagerTest {
}
@Test
public void shouldAddRecycledStandbyTaskfromActiveToPendingTasksToInitWithStateUpdaterEnabled() {
public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithStateUpdaterEnabled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
@ -1001,7 +998,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
verify(activeTaskToRecycle).prepareCommit();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
verify(tasks).removeTask(activeTaskToRecycle);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
@ -1009,7 +1005,7 @@ public class TaskManagerTest {
}
@Test
public void shouldAddRecycledStandbyTaskfromActiveToTaskRegistryWithStateUpdaterDisabled() {
public void shouldAddRecycledStandbyTasksFromActiveToTaskRegistryWithStateUpdaterDisabled() {
final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions)
.withInputPartitions(taskId01Partitions)
.inState(State.RUNNING).build();
@ -1025,7 +1021,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions)));
verify(activeTaskToRecycle).prepareCommit();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
verify(tasks).replaceActiveWithStandby(standbyTask);
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
@ -1065,7 +1060,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap());
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
verify(activeTaskToClose).prepareCommit();
verify(activeTaskToClose).closeClean();
verify(tasks).removeTask(activeTaskToClose);
@ -2129,7 +2123,6 @@ public class TaskManagerTest {
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@ -2157,7 +2150,6 @@ public class TaskManagerTest {
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@ -2203,48 +2195,15 @@ public class TaskManagerTest {
taskManager.handleRebalanceStart(emptySet());
assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
public void shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
public void shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
taskManager.handleLostAll();
verify(activeTaskCreator).reInitializeThreadProducer();
}
@Test
public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
// `handleAssignment`
when(consumer.assignment()).thenReturn(assignment);
when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00));
// `handleAssignment`
doThrow(new RuntimeException("KABOOM!")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
taskManager.handleAssignment(taskId00Assignment, emptyMap());
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
assertThat(task00.state(), is(Task.State.RUNNING));
taskManager.handleRevocation(taskId00Partitions);
final RuntimeException thrown = assertThrows(
RuntimeException.class,
() -> taskManager.handleAssignment(emptyMap(), emptyMap())
);
assertThat(
thrown.getMessage(),
is("Encounter unexpected fatal error for task 0_0")
);
assertThat(thrown.getCause(), instanceOf(RuntimeException.class));
assertThat(thrown.getCause().getMessage(), is("KABOOM!"));
verify(activeTaskCreator).reInitializeProducer();
}
@Test
@ -2552,7 +2511,7 @@ public class TaskManagerTest {
}
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithAlos() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask corruptedActive = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@ -2613,7 +2572,7 @@ public class TaskManagerTest {
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final AtomicBoolean corruptedTaskChangelogMarkedAsCorrupted = new AtomicBoolean(false);
@ -2691,7 +2650,7 @@ public class TaskManagerTest {
}
@Test
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS() {
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos() {
final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final Map<TopicPartition, OffsetAndMetadata> offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
revokedActiveTask.setCommittableOffsetsAndMetadata(offsets00);
@ -2745,7 +2704,7 @@ public class TaskManagerTest {
public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
final StateMachineTask revokedActiveTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
@ -3021,7 +2980,7 @@ public class TaskManagerTest {
when(activeTaskCreator.createTasks(any(), eq(assignmentActive)))
.thenReturn(asList(task00, task01, task02));
when(activeTaskCreator.threadProducer()).thenReturn(producer);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
when(standbyTaskCreator.createTasks(assignmentStandby))
.thenReturn(singletonList(task10));
@ -3176,7 +3135,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), emptyMap());
assertThat(task00.state(), is(Task.State.CLOSED));
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@ -3206,7 +3164,7 @@ public class TaskManagerTest {
@Test
public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExactlyOnceV2() {
when(activeTaskCreator.threadProducer()).thenReturn(mock(StreamsProducer.class));
when(activeTaskCreator.streamsProducer()).thenReturn(mock(StreamsProducer.class));
shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(ProcessingMode.EXACTLY_ONCE_V2);
}
@ -3315,62 +3273,12 @@ public class TaskManagerTest {
assertThat(task03.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(activeTaskCreator, times(4)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(activeTaskCreator).close();
}
@Test
public void shouldCloseActiveTasksAndPropagateTaskProducerExceptionsOnCleanShutdown() {
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions)
);
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
public Set<TopicPartition> changelogPartitions() {
return singleton(changelog);
}
};
final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
task00.setCommittableOffsetsAndMetadata(offsets);
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
doThrow(new RuntimeException("whatever"))
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
taskManager.handleAssignment(assignment, emptyMap());
assertThat(task00.state(), is(Task.State.CREATED));
taskManager.tryToCompleteRestoration(time.milliseconds(), null);
assertThat(task00.state(), is(Task.State.RESTORING));
assertThat(
taskManager.activeTaskMap(),
Matchers.equalTo(
mkMap(
mkEntry(taskId00, task00)
)
)
);
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(changeLogReader).enforceRestoreActive();
verify(changeLogReader).completedChangelogs();
final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.shutdown(true));
assertThat(task00.state(), is(Task.State.CLOSED));
assertThat(exception.getCause().getMessage(), is("whatever"));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeThreadProducerIfNeeded();
}
@Test
public void shouldCloseActiveTasksAndPropagateThreadProducerExceptionsOnCleanShutdown() {
public void shouldCloseActiveTasksAndPropagateStreamsProducerExceptionsOnCleanShutdown() {
final TopicPartition changelog = new TopicPartition("changelog", 0);
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
mkEntry(taskId00, taskId00Partitions)
@ -3383,7 +3291,7 @@ public class TaskManagerTest {
};
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(singletonList(task00));
doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
doThrow(new RuntimeException("whatever")).when(activeTaskCreator).close();
taskManager.handleAssignment(assignment, emptyMap());
@ -3411,7 +3319,7 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
verify(activeTaskCreator).close();
}
@Test
@ -3505,8 +3413,7 @@ public class TaskManagerTest {
};
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01, task02));
doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
doThrow(new RuntimeException("whatever all")).when(activeTaskCreator).closeThreadProducerIfNeeded();
doThrow(new RuntimeException("whatever")).when(activeTaskCreator).close();
taskManager.handleAssignment(assignment, emptyMap());
@ -3540,9 +3447,8 @@ public class TaskManagerTest {
assertThat(task02.state(), is(Task.State.CLOSED));
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
verify(activeTaskCreator, times(3)).closeAndRemoveTaskProducerIfNeeded(any());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(activeTaskCreator).close();
}
@Test
@ -3566,7 +3472,7 @@ public class TaskManagerTest {
assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
// the active task creator should also get closed (so that it closes the thread producer if applicable)
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(activeTaskCreator).close();
// `tryToCompleteRestoration`
verify(consumer).assignment();
verify(consumer).resume(eq(emptySet()));
@ -3588,8 +3494,7 @@ public class TaskManagerTest {
taskManager.shutdown(true);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(failedStatefulTask.id());
verify(activeTaskCreator).closeThreadProducerIfNeeded();
verify(activeTaskCreator).close();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(failedStatefulTask).prepareCommit();
verify(failedStatefulTask).suspend();
@ -3668,19 +3573,15 @@ public class TaskManagerTest {
verify(removedFailedStatefulTask).prepareCommit();
verify(removedFailedStatefulTask).suspend();
verify(removedFailedStatefulTask).closeDirty();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId03);
verify(removedFailedStandbyTask).prepareCommit();
verify(removedFailedStandbyTask).suspend();
verify(removedFailedStandbyTask).closeDirty();
verify(activeTaskCreator, never()).closeAndRemoveTaskProducerIfNeeded(taskId04);
verify(removedFailedStatefulTaskDuringRemoval).prepareCommit();
verify(removedFailedStatefulTaskDuringRemoval).suspend();
verify(removedFailedStatefulTaskDuringRemoval).closeDirty();
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId05);
verify(removedFailedStandbyTaskDuringRemoval).prepareCommit();
verify(removedFailedStandbyTaskDuringRemoval).suspend();
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
verify(activeTaskCreator, never()).closeAndRemoveTaskProducerIfNeeded(taskId00);
}
@Test
@ -3873,7 +3774,7 @@ public class TaskManagerTest {
@Test
public void shouldCommitViaProducerIfEosV2Enabled() {
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));
@ -3881,16 +3782,7 @@ public class TaskManagerTest {
allOffsets.putAll(offsetsT01);
allOffsets.putAll(offsetsT02);
shouldCommitViaProducerIfEosEnabled(ProcessingMode.EXACTLY_ONCE_V2, offsetsT01, offsetsT02);
verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
verifyNoMoreInteractions(producer);
}
private void shouldCommitViaProducerIfEosEnabled(final ProcessingMode processingMode,
final Map<TopicPartition, OffsetAndMetadata> offsetsT01,
final Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
final TaskManager taskManager = setUpTaskManager(processingMode, false);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
task01.setCommittableOffsetsAndMetadata(offsetsT01);
@ -3904,6 +3796,9 @@ public class TaskManagerTest {
when(consumer.groupMetadata()).thenReturn(new ConsumerGroupMetadata("appId"));
taskManager.commitAll();
verify(producer).commitTransaction(allOffsets, new ConsumerGroupMetadata("appId"));
verifyNoMoreInteractions(producer);
}
@Test
@ -4608,7 +4503,7 @@ public class TaskManagerTest {
final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.threadProducer()).thenReturn(producer);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
final Map<TopicPartition, OffsetAndMetadata> offsetsT00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null));
final Map<TopicPartition, OffsetAndMetadata> offsetsT01 = singletonMap(t1p1, new OffsetAndMetadata(1L, null));
@ -4720,7 +4615,6 @@ public class TaskManagerTest {
taskManager.handleAssignment(taskId00Assignment, Collections.emptyMap());
taskManager.handleAssignment(Collections.emptyMap(), taskId00Assignment);
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
verify(activeTaskCreator).createTasks(any(), eq(emptyMap()));
verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap());
verifyNoInteractions(consumer);