This pull request is an attempt to get what has started in #12524 to completion as part of the Streams project migration to Mockito.
Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
The following race can happen in the state updater code path
Task is restoring, owned by state updater
We fall out of the consumer group, lose all partitions
We therefore register a "TaskManager.pendingUpdateAction", to CLOSE_DIRTY
We also register a "StateUpdater.taskAndAction" to remove the task
We get the same task reassigned. Since it's still owned by the state updater, we don't do much
The task completes restoration
The "StateUpdater.taskAndAction" to remove will be ignored, since it's already restored
Inside "handleRestoredTasksFromStateUpdater", we close the task dirty because of the pending update action
We now have the task assigned, but it's closed.
To fix this particular race, we cancel the "close" pending update action. Furthermore, since we may have made progress in other threads during the missed rebalance, we need to add the task back to the state updater, to at least check if we are still at the end of the changelog. Finally, it seems we do not need to close dirty here, it's enough to close clean when we lose the task, related to KAFKA-10532.
This should fix the flaky EOSIntegrationTest.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Implements KIP-992.
Adds TimestampedKeyQuery and TimestampedRangeQuery (IQv2) for ts-ks-store, plus changes semantics of existing KeyQuery and RangeQuery if issues against a ts-kv-store, now unwrapping value-and-timestamp and only returning the plain value.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This interface provides a common supertype for `StreamThread` and
`DefaultTaskExecutor.TaskExecutorThread`, which will be used by KIP-892
to differentiate between "processing" threads and interactive query
threads.
This is needed because `DefaultTaskExecutor.TaskExecutorThread` is
`private`, so cannot be seen directly from `RocksDBStore`.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Introduce a dummy node connected to every other node and run Bellman-ford from the dummy node once instead of from every node in the graph.
Reviewers: Qichao Chu (@ex172000), Matthias J. Sax <matthias@confluent.io>
Named topologies is a feature that is planned to be removed from AK with 4.0 and was never used via the public interface. It was used in a few versions of KSQL only, but was disabled there as well. While we do not want to remove it in 3.7 yet, we should disable flaky tests in that feature, that are disruptive to AK development.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Implementation for KIP-954: support custom DSL store providers
Testing Strategy:
- Updated the topology tests to ensure that the configuration is picked up in the topology builder
- Manually built a Kafka Streams application using a customer DslStoreSuppliers class and verified that it was used
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang.wang.us@gmail.com>
The state updater waits on a condition variable if no tasks exist that need to be updated. The condition variable is wrapped by a loop to account for spurious wake-ups. The check whether updating tasks exist is done in the condition of the loop. Actually, the state updater thread can change whether updating tasks exists, but since the state updater thread is waiting for the condition variable the check for the existence of updating tasks will always return the same value as long as the state updater thread is waiting. Thus, the check only need to be done once before entering the loop.
This commit moves check before the loop making also the usage of mocks more robust since the processing becomes more deterministic.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
A few variables which aren't being used anymore but still exist. This commit removes those unused variables.
Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The consumer refactoring project introduced another `Consumer` implementation, creating two different, coexisting implementations of the `Consumer` interface:
* `KafkaConsumer` (AKA "existing", "legacy" consumer)
* `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer)
The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level `KafkaConsumer` but then delegate to another implementation under the covers. There will be two delegates at first:
* `LegacyKafkaConsumer`
* `AsyncKafkaConsumer`
`LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That implementation handles the existing group protocol. `AsyncKafkaConsumer` is renamed from `PrototypeAsyncConsumer` and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the `internals` sub-package to discourage their use.
This task is part of the work to implement support for the new KIP-848 consumer group protocol.
Reviewers: Philip Nee <pnee@confluent.io>, Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
This patch fixes the compilation error for JDK 21 introduced in https://github.com/apache/kafka/pull/14708.
Reviewers: Ismael Juma <ismael@juma.me.uk>, David Jacot <djacot@confluent.io>
This is a follow up from #14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR #8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor.
This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
When a task is corrupted, uncorrupted tasks are committed. That is also true for standby tasks. Committing standby tasks actually means that they are checkpointed.
When the state updater is enabled, standbys are owned by the state updater. The stream thread should not checkpoint them when handling corrupted tasks. That is not a big limitation since the state updater periodically checkpoints standbys anyway. Additionally, when handling corrupted tasks the important thing is to commit active running tasks to abort the transaction. Committing standby tasks do not abort any transaction.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in #14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...).
It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes)
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
[KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams)
The key requirments got relaxed for the followinger streams dsl operator:
left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.
Reviewers: Walker Carlson <wcarlson@apache.org>
Part of KIP-985.
Updates JavaDocs for `RangeQuery` and `ReadOnlyKeyValueStore` with regard to ordering guarantees.
Updates Kafka Streams upgrade guide with KIP information.
Reviewer: Matthias J. Sax <matthias@confluent.io>
Trying to fix flakiness for the shouldInvokeUserDefinedGlobalStateRestoreListener test introduced in #14519.
Fixes are:
-Do not use static membership.
-Always close the 2nd KafkaStreams instance.
-Await for the Kafka Streams instance to transition to a RUNNING state before proceeding.
-Added logging for the StateRestoreListener implementation.
-Reduce restore consumer MAX_POLL_RECORDS.
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
After finishing restoration, we should only log the active tasks. Standby tasks are not part of restoration and it can be confusing to see them show up on this log message.
Reviewers: Matthias J. Sax <matthias@confluent.io>
- Introduce a new internal config flag to enable processing threads
- If enabled, create a scheduling task manager inside the normal task manager (renamings will be added on top of this), and use it from the stream thread
- All operations inside the task manager that change task state, lock the corresponding tasks if processing threads are enabled.
- Adds a new abstract class AbstractPartitionGroup. We can modify the underlying implementation depending on the synchronization requirements. PartitionGroup is the unsynchronized subclass that is going to be used by the original code path. The processing thread code path uses a trivially synchronized SynchronizedPartitionGroup that uses object monitors. Further down the road, there is the opportunity to implement a weakly synchronized alternative. The details are complex, but since the implementation is essentially a queue + some other things, it should be feasible to implement this lock-free.
- Refactorings in StreamThreadTest: Make all tests use the thread member variable and add tearDown in order avoid thread leaks and simplify debugging. Make the test parameterized on two internal flags: state updater enabled and processing threads enabled. Use JUnit's assume to disable all tests that do not apply.
Enable some integration tests with processing threads enabled.
Reviewer: Bruno Cadonna <bruno@confluent.io>
When we get a suspended task re-assigned in the eager rebalance protocol, we have to add the task back to the state updater so that it has a chance to catch up with its change log.
This was prevented by a check in Tasks, which disallows removing SUSPENDED tasks from the task registry. I couldn't find a reason why this must be an invariant of the task registry, so this weakens the check.
The error happens in the integration between TaskRegistry and TaskManager. However, this change anyway adds unit tests to more closely specify the intended behavior of the two modules.
Reviewers: Bruno Cadonna <bruno@confluent.io>
We embrace immutability and thus should return a new object instead of
`this`, similar to other config classed we use in the DSL.
Side JavaDocs cleanup for a bunch of classes.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Spotbugs was temporarily disabled as part of KAFKA-15485 to support Kafka build with JDK 21. This PR upgrades the spotbugs version to 4.8.0 which adds support for JDK 21 and enables it's usage on build again.
Reviewers: Divij Vaidya <diviv@amazon.com>
When tasks are found corrupted, Kafka Streams tries to commit
the non-corrupted tasks before closing and reviving the corrupted
active tasks. Besides active running tasks, Kafka Streams tries
to commit restoring active tasks and standby tasks. However,
restoring active tasks do not need to be committed since they
do not have offsets to commit and the current code does not
write a checkpoint. Furthermore, trying to commit restoring
active tasks with the state updater enabled results in the
following error:
java.lang.UnsupportedOperationException: This task is read-only
at org.apache.kafka.streams.processor.internals.ReadOnlyTask.commitNeeded(ReadOnlyTask.java:209)
...
since commitNeeded() is not a read-only method for active tasks.
In future, we can consider writing a checkpoint for active
restoring tasks in this situation. Additionally, we should
fix commitNeeded() in active tasks to be read-only.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Fixes logging for KafkaStreams#streamThreadLeaveConsumerGroup.
In order not to lose the trace of the whole exception, passing Exception e as a second argument, while message is pre-formatted and passed as string as a first argument. With this, we won't loose the stack trace of the exception.
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
With https://issues.apache.org/jira/browse/KAFKA-10575 StateRestoreListener#onRestoreSuspended was added. But local tests show that it is never called because DelegatingStateRestoreListener was not updated to call a new method
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>, Bruno Cadonna <cadonna@confluent.io>
Now that the implementation for the state updater is done, we can enable it by default.
This PR enables the state updater by default and fixes code that made assumptions that are not true when the state updater is enabled (mainly tests).
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Walker Carlson <wcarlson@confluent.io>
* Implements start and stop of task executors
* Introduce flush operation to keep consumer operations out of the processing threads
* Fixes corner case: handle requested unassignment during shutdown
* Fixes corner case: handle race between voluntary unassignment and requested unassigment
* Fixes corner case: task locking future completes for the empty set
* Fixes corner case: we should not reassign a task with an uncaught exception to a task executor
* Improved logging
* Number of threads controlled from outside, of the TaskManager
Reviewers: Bruno Cadonna <bruno@confluent.io>
When Streams completes a rebalance, it unlocks state directories
all unassigned tasks. Unfortunately, when the state updater is enabled,
Streams does not look into the state updater to determine the
unassigned tasks.
This commit corrects this by adding the check.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
The process method inside the tasks needs to be called from within
the processing threads. However, it currently interacts with the
consumer in two ways:
* It resumes processing when the PartitionGroup buffers are empty
* It fetches the lag from the consumer
We introduce updateLags() and
resumePollingForPartitionsWithAvailableSpace() methods that call into
the task from the polling thread, in order to set up the consumer
correctly for the next poll, and extract metadata from the consumer
after the poll.
Reviewer: Bruno Cadonna <bruno@confluent.io>
When a Streams application is subscribed with a pattern to
input topics and an input topic is deleted, the stream thread
transists to PARTITIONS_REVOKED and a rebalance is triggered.
This happens inside the poll call. Sometimes, the poll call
returns before a new assignment is received. That means, Streams
executes the poll loop in state PARTITIONS_REVOKED.
With the state updater enabled processing is also executed in states
other than RUNNING and so processing is also executed when the
stream thread is in state PARTITION_REVOKED. However, that triggers
an IllegalStateException with error message:
No current assignment for partition TEST-TOPIC-A-0
which is a fatal error.
This commit prevents processing when the stream thread is in state
PARTITIONS_REVOKED.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
State updater can get into a busy loop when all tasks are paused, because changelogReader will never return that all changelogs have been read completely. Fix this, by awaiting if updatingTasks is empty.
Related and included: if we are restoring and all tasks are paused, we should return immediately from StoreChangelogReader.
Reviewer: Bruno Cadonna <cadonna@apache.org>
The state directory throws a lock exception during initialization if a task state directory is still locked by the stream thread that previously owned the task. When this happens, Streams catches the lock exception, ignores the exception, and tries to initialize the task in the next exception.
In the state updater code path, we missed catching the lock exception when Streams recycles a task. That leads to the lock exception thrown to the exception handler, which is unexpected and leads to test failures.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
LogCaptureAppender sets the log level in various tests to check if a certain log message is produced. The log level is however never reverted, changing the log level across the board and introducing flakiness due to non-determinism since the log level depends on execution order. Some log messages change the timing inside tests significantly.
Reviewer: Bruno Cadonna <cadonna@apache.org>
This is one of the steps required for kafka to compile with Java 21.
For each case, one of the following fixes were applied:
1. Suppress warning if fixing would potentially result in an incompatible change (for public classes)
2. Add final to one or more methods so that the escape is not possible
3. Replace method calls with direct field access.
In addition, we also fix a couple of compiler warnings related to deprecated references in the `core` module.
See the following for more details regarding the new lint warning:
https://www.oracle.com/java/technologies/javase/21-relnote-issues.html#JDK-8015831
Reviewers: Divij Vaidya <diviv@amazon.com>, Satish Duggana <satishd@apache.org>, Chris Egerton <chrise@aiven.io>
All block cache metrics are being multiplied by the total number of
column families. In a `RocksDBTimestampedStore`, we have 2 column
families (the default, and the timestamped values), which causes all
block cache metrics in these stores to become doubled.
The cause is that our metrics recorder uses `getAggregatedLongProperty`
to fetch block cache metrics. `getAggregatedLongProperty` queries the
property on each column family in the database, and sums the results.
Since we always configure all column families to share the same block
cache, that causes the same block cache to be queried multiple times for
its metrics, with the results added togehter, effectively multiplying
the real value by the total number of column families.
To fix this, we should simply use `getLongProperty`, which queries a
single column family (the default one). Since all column families share
the same block cache, querying just one of them will give us the correct
metrics for that shared block cache.
Note: the same block cache is shared among all column families of a store
irrespective of whether the user has configured a shared block cache
across multiple stores.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
Avoid busy waiting for processable tasks. We need to be a bit careful here to not have the task executors to sleep when work is available. We have to make sure to signal on the condition variable any time a task becomes "processable". Here are some situations where a task becomes processable:
- Task is unassigned from another TaskExecutor.
- Task state is changed (should only happen inside when a task is locked inside the polling phase).
- When tasks are unlocked.
- When tasks are added.
- New records available.
- A task is resumed.
So in summary, we
- We should probably lock tasks when they are paused and unlock them when they are resumed. We should also wake the task executors after every polling phase. This belongs to the StreamThread integration work (separate PR). We add DefaultTaskManager.signalProcessableTasks for this.
- We need to awake the task executors in DefaultTaskManager.unassignTask, DefaultTaskManager.unlockTasks and DefaultTaskManager.add.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@apache.org>
A mocked method is executed unexpectedly when we enable DEBUG
log level, leading to confusing test failures during debugging.
Since the log message itself seems useful, we adapt the test
to take the additional mocked method call into account).
Reviewer: Bruno Cadonna <cadonna@apache.org>
Resets the value of transactionInFlight to false when closing the
StreamsProducer. This ensures we don't try to commit against a
closed producer
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Preliminary fix for KAFKA-15429 which updates StreamThread.completeShutdown to
catch-and-log errors from consumer.unsubscribe. Though this does not prevent
the exception, it does preserve the original exception that caused the stream
thread to exit.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Implement setting and clearing task timeouts, as well as changing the output on exceptions to make
it similar to the existing code path.
Reviewer: Walker Carlson <wcarlson@apache.org>
Minor fix to avoid creating unnecessary standby tasks, especially when these may be surprising or unexpected as in the case of an application with num.standby.replicas = 0 and warmup replicas disabled.
The "bug" here was introduced during the fix for an issue with cooperative rebalancing and in-memory stores. The fundamental problem is that in-memory stores cannot be unassigned from a consumer for any period, however temporary, without being closed and losing all the accumulated state. This caused some grief when the new HA task assignor would assign an active task to a node based on the readiness of the standby version of that task, but would have to remove the active task from the initial assignment so it could first be revoked from its previous owner, as per the cooperative rebalancing protocol. This temporary gap in any version of that task among the consumer's assignment for that one intermediate rebalance would end up causing the consumer to lose all state for it, in the case of in-memory stores.
To fix this, we simply began to place standby tasks on the intended recipient of an active task awaiting revocation by another consumer. However, the fix was a bit of an overreach, as we assigned these temporary standby tasks in all cases, regardless of whether there had previously been a standby version of that task. We can narrow this down without sacrificing any of the intended functionality by only assigning this kind of standby task where the consumer had previously owned some version of it that would otherwise potentially be lost.
Also breaks up some of the long log lines in the StreamsPartitionAssignor and expands the summary info while moving it all to the front of the line (following reports of missing info due to truncation of long log lines in larger applications)
assertThrows makes the verification of exceptions clearer and more intuitive, thus improving code readability compared to the annotation approach. It is considered a test smell in the research literature. One possible research is due to developers not keeping up to date with recent versions of testing frameworks.
All such patterns in streams have been refactored.
Reviewers: vamossagar12 <sagarmeansocean@gmail.com>, Justine Olshan <jolshan@confluent.io>
A stream thread should only change to RUNNING if there are no
active tasks in restoration in the state updater and if there
are no pending tasks to recycle and to init.
Usually all pending tasks to init are added to the state updater
in the same poll iteration that handles the assignment. However,
if during an initialization of a task a LockException the task
is re-added to the tasks to init and initialization is retried
in the next poll iteration.
A LockException might occur when a state directory is still locked
by another thread, when the rebalance just happened.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Currently, Kafka Streams only tries to purge records whose
offset are committed from a repartition topic when at
least one offset was committed in the current commit.
The coupling between committing some offsets and purging
records is not needed and might delay purging of records.
For example, if a in-flight call for purging records has not
completed yet when a commit happens, a new call
is not issued.
If then the earlier in-flight call for purging records
finally completes but the next commit does not commit any
offsets, Streams does not issue the call for purging records
whose offset were committed in the previous commit
because the purging call was still in-flight.
This change issues calls for purging records during any commit
if the purge interval passed, even if no offsets were committed
in the current commit.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Walker Carlson <wcarlson@confluent.io>
KIP-904 introduced a backward incompatible change that requires a 2-bounce rolling upgrade.
The new "3.4" upgrade config value is not recognized by `AssignorConfiguration` though and thus crashed Kafka Streams if use.
Reviewers: Farooq Qaiser <fqaiser94@gmail.com>, Bruno Cadonna <bruno@confluent.io>
Reusing an admin client across tests can cause false positives in leak checkers, so don't do it.
Reviewers: Divij Vaidya <diviv@amazon.com>, Matthias J. Sax <matthias@confluent.io>
Implements punctuation inside processing threads. The scheduler
algorithm checks if a task that is not assigned currently can
be punctuated, and returns it when a worker thread asks for the
next task to be processed. Then, the processing thread runs all
punctuations in the punctionation queue.
Piggy-backed: take TaskExecutionMetadata into account when
processing records.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Part of KIP-925.
- Add more tests for HighAvailabilityTaskAssignor
- Remove null and optional check for RackAwareTaskAssignor
- Pass rack aware assignor configs to getMainConsumerConfigs so that they can be picked up in rebalance protocol
- Change STATELESS_NON_OVERLAP_COST to 0. It was a mistake to be 1. Stateless tasks should be moved without this cost.
- Update of existing tests
Reviewers: Matthias J. Sax <matthias@confluent.io>
Change in response to KIP-941.
New PR due to merge issue.
Changes line 57 in the RangeQuery class file from:
public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
return new RangeQuery<>(Optional.of(lower), Optional.of(upper));
}
to
public static <K, V> RangeQuery<K, V> withRange(final K lower, final K upper) {
return new RangeQuery<>(Optional.ofNullable(lower), Optional.ofNullable(upper));
}
Testing strategy:
Since null values can now be entered in RangeQuerys in order to receive full scans, I changed the logic defining query starting at line 1085 in IQv2StoreIntegrationTest.java from:
final RangeQuery<Integer, V> query;
if (lower.isPresent() && upper.isPresent()) {
query = RangeQuery.withRange(lower.get(), upper.get());
} else if (lower.isPresent()) {
query = RangeQuery.withLowerBound(lower.get());
} else if (upper.isPresent()) {
query = RangeQuery.withUpperBound(upper.get());
} else {
query = RangeQuery.withNoBounds();
}
to
query = RangeQuery.withRange(lower.orElse(null), upper.orElse(null));
because different combinations of isPresent() in the bounds is no longer necessary.
Reviewers: John Roesler <vvcephei@apache.org>, Bill Bejeck <bbejeck@apache.org>
Part of KIP-915.
- Change TaskAssignor interface to take RackAwareTaskAssignor
- Integrate RackAwareTaskAssignor to StreamsPartitionAssignor and HighAvailabilityTaskAssignor
- Update HAAssignor tests
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias J. Sax <matthias@confluent.io>
Part of KIP-925.
- Add configs for rack aware assignor
- Update standby optimizer in RackAwareTaskAssignor to have more rounds
- Refactor some method in RackAwareTaskAssignorTest to test utils so that they can also be used in HighAvailabilityTaskAssignorTest and other tests
Reviewers: Matthias J. Sax <matthias@confluent.io>