This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR actually catches processing exceptions from punctuate.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR catch the exceptions thrown while handling a processing exception
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
As discussed in #16657 (comment) , we should make logger as static to avoid creating multiple logger instances.
I use the regex private.*Logger.*LoggerFactory to search and check all the results if certain logs need to be static.
There are some exceptions that loggers don't need to be static:
1) The logger in the inner class. Since java8 doesn't support static field in the inner class.
https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java#L3676
2) Custom loggers for each instance (non-static + non-final). In this case, multiple logger instances is actually really needed.
https://github.com/apache/kafka/blob/trunk/storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java#L166
3) The logger is initialized in constructor by LogContext. Many non-static but with final modifier loggers are in this category, that's why I use .*LoggerFactory to only check the loggers that are assigned initial value when declaration.
4) protected final Logger log = Logger.getLogger(getClass())
This is for subclass can do logging with subclass name instead of superclass name.
But in this case, if the log access modifier is private, the purpose cannot be achieved since subclass cannot access the log defined in superclass. So if access modifier is private, we can replace getClass() with <className>.class
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This PR reduces the MAX_BLOCK_MS config which defaults to 60sec to
10sec, to avoid a race condition with the 60sec test timeout.
Reviewers: Bill Bejeck <bill@confluent.io>
This PR is part of KIP1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR expose the new ErrorHandlerContext as a parameter to the Deserialization exception handlers and deprecate the previous handle signature.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR expose the new ErrorHandlerContext as a parameter to the Production exception handler and deprecate the previous handle signature.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
To avoid a resource leak, we need to close the AdminClient after the test.
Reviewers: Lianet Magrans <lianetmr@gmail.com>, Matthias J. Sax <matthias@confluent.io>
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: loicgreffier <loic.greffier@michelin.com>
Minor code improvements across different classed, related to the `ProcessingExceptionHandler` implementation (KIP-1033).
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR is part of KAFKA-16448 (KIP-1033) which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR actually catches processing exceptions.
Authors:
@Dabz
@sebastienviale
@loicgreffier
Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Bruno Cadonna <cadonna@apache.org>
When Streams tries to transit a restored active task to RUNNING, the
first thing it does is getting the committed offsets for this task.
If getting the offsets expires a timeout, Streams does not re-throw
the error initially, but tries to get the committed offsets later
until a Streams-specific timeout is hit.
Restored active tasks from the state updater are removed from the
output queue of the restored tasks in the state updater. If a
timeout occurs, the restored task is neither added to the
task registry nor re-added to the state updater. The task is lost
since it is not maintained anywhere. This means the task is also
not closed. When the same task is created again on the same
stream thread since the stream thread does not know about this
lost task, the state stores are opened again and RocksDB will
throw the "No locks available" error.
This commit re-adds the task to the state updater if the
committed request times out.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
When a active tasks are revoked they land as suspended tasks in the
task registry. If they are then reassigned, the tasks are resumed
and put into restoration. On assignment, we first handle the tasks
in the task registry and then the tasks in the state updater. That
means that if a task is re-assigned after a revocation, we remove
the suspended task from the task registry, resume it, add it
to the state updater, and then remove it from the list of tasks
to create. After that we iterate over the tasks in the state
updater and remove from there the tasks that are not in the list
of tasks to create. However, now the state updater contains the
resumed tasks that we removed from the task registry before but
are no more in the list of tasks to create. In other words, we
remove the resumed tasks from the state updater and close them
although we just got them assigned.
This commit ensures that we first handle the tasks in the
state updater and then the tasks in the task registry.
Cherry-pick of 4ecbb75
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
With enabled state updater tasks that are created but not initialized
are stored in a set. In each poll iteration the stream thread drains
that set, intializes the tasks, and adds them to the state updater.
On partition lost, all active tasks are closed.
This commit ensures that active tasks pending initialization in
the set mentioned above are closed cleanly on partition lost.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
When the PurgeRepartitionTopicintegrationTest was written, the InitialTaskDelayMs was hard-coded on the broker requiring setting a timeout in the test to wait for the delay to expire. But I believe this creates a race condition where the test times out before the broker deletes the inactive segment. PR #15719 introduced an internal config to control the IntitialTaskDelayMs config for speeding up tests, and this PR leverages this internal config to reduce the task delay to 0 to eliminate this race condition.
For a non-existing output topic, Kafka Streams ends up in an infinite retry loop, because the returned TimeoutException extends RetriableException.
This PR updates the error handling pass for this case and instead of retrying calls the ProductionExceptionHandler to allow breaking the infinite retry loop.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Foreign key joins have an additional "internal" state store used for subscriptions, which is not exposed for configuration via Materialized or StoreBuilder which means there is no way to plug in a different store implementation via the DSL operator. However, we should respect the configured default dsl store supplier if one is configured, to allow these stores to be customized and conform to the store type selection logic used for other DSL operator stores
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Each KafkaStreams instance maintains a map from threadId to state
to use to aggregate to a KafkaStreams app state. The map is updated
on every state change, and when a new thread is created. State change
updates are done in a synchronized blocks, however the update that
happens on thread creation is not, which can raise
ConcurrentModificationException. This patch moves this update
into the listener object and protects it using the object's lock.
It also moves ownership of the state map into the listener so that
its less likely that future changes access it without locking
Reviewers: Matthias J. Sax <matthias@confluent.io>
The new default standby task assignment in TaskAssignment should only assign standby tasks for changelogged tasks, not all stateful tasks.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
KStreamKStreamJoin was recently refactored to be type safe with regard to left and right value-types. This PR updates the corresponding test to use different value types instead of the same one to improve test coverage.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
To avoid confusion in 3.8/until we fully remove all the old task assignors and internal config, we should rename the old internal assignor classes like the StickyTaskAssignor so that they won't be mixed up with the new version of the assignor (which is also named StickyTaskAssignor)
Reviewers: Bruno Cadonna <cadonna@apache.org>, Josep Prat <josep.prat@aiven.io>
Since the new public API for TaskAssignor shared a name, this rename will prevent users from confusing the internal definition with the public one.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Also moved the assignment validation test from StreamsPartitionAssignorTest to TaskAssignmentUtilsTest.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
We have already enabled the state updater by default once.
However, we ran into issues that forced us to disable it again.
We think that we fixed those issues. So we want to enable the
state updater again by default.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
We now provide a way to more easily customize the rack aware
optimizations that we provide by way of a configuration class called
RackAwareOptimizationParams.
We also simplified the APIs for the optimizeXYZ utility functions since
they were mutating the inputs anyway.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR brings ProcessingExceptionHandler in Streams configuration.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
Reviewer: Bruno Cadonna <cadonna@apache.org>
The StreamsConfig class was not parsing the new task assignment
configuration flag properly, which made it impossible to properly
configure a custom task assignor.
This PR fixes this and adds a bit of INFO logging to help users diagnose
assignor misconfiguration issues.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Added more testing for the StickyTaskAssignor, which includes a large-scale test with rack aware enabled.
Also added a no-op change to StreamsAssignmentScaleTest.java to allow for rack aware optimization testing.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR updates all of the streams task assignment code to use the new AssignmentConfigs public class.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
A task does not know anything about a produce error thrown
by a different task. That might lead to a InvalidTxnStateException
when a task attempts to do a transactional operation on a producer
that failed due to a different task.
This commit stores the produce exception in the streams producer
on completion of a send instead of the record collector since the
record collector is on task level whereas the stream producer
is on stream thread level. Since all tasks use the same streams
producer the error should be correctly propagated across tasks
of the same stream thread.
For EOS alpha, this commit does not change anything because
each task uses its own producer. The send error is still
on task level but so is also the transaction.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR brings ProcessingExceptionHandler interface and default implementations.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
Reviewer: Bruno Cadonna <cadonna@apache.org>
Fixed the calculation of the store name list based on the subtopology being accessed.
Also added a new test to make sure this new functionality works as intended.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR takes care of making the call back toTaskAssignor.onAssignmentComputed.
It also contains a change to the public AssignmentConfigs API, as well as some simplifications of the StickyTaskAssignor.
This PR also changes the rack information fetching to happen lazily in the case where the TaskAssignor makes its decisions without said rack information.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This fills in the implementation details of the standby task assignment utility functions within TaskAssignmentUtils.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR brings ProcessingExceptionHandler in Streams configuration.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
Reviewer: Bruno Cadonna <cadonna@apache.org>
This PR implements the rack aware optimization of active tasks that can be used by the assignors themselves. It takes in the full output of the assignment and tries to reorganize it so as to minimize cross-rack traffic.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR adds the logic and wiring necessary to make the callback to
TaskAssignor::onAssignmentComputed with the necessary parameters.
We also fixed some log statements in the actual assignment error
computation, as well as modified the ApplicationState::allTasks method
to return a Map instead of a Set of TaskInfos.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR is part of KAFKA-16448 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing.
This PR brings ProcessingExceptionHandler interface and default implementations.
Co-authored-by: Dabz <d.gasparina@gmail.com>
Co-authored-by: sebastienviale <sebastien.viale@michelin.com>
Reviewer: Bruno Cadonna <cadonna@apache.org>
The introduced changes provide a cleaner definition of the join side in KStreamKStreamJoin. Before, this was done by using a Boolean flag, which led to returning a raw LeftOrRightValue without generic arguments because the generic type arguments depended on the boolean input.
Reviewers: Greg Harris <greg.harris@aiven.io>, Bruno Cadonna <cadonna@apache.org>
This PR adds the post-processing of the TaskAssignment to figure out if the new assignment is valid, and return an AssignmentError otherwise.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR creates the new public config of KIP-924 in StreamsConfig and uses it to instantiate user-created TaskAssignors. If such a TaskAssignor is found and successfully created we then use that assignor to perform the task assignment, otherwise we revert back to the pre KIP-924 world with the internal task assignors.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Almog Gavra <almog@responsive.dev>
Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).
This new `StateStore` metric tracks the timestamp that the oldest
surviving Iterator was created.
This timestamp should continue to climb, and closely track the current
time, as old iterators are closed and new ones created. If the timestamp
remains very low (i.e. old), that suggests an Iterator has leaked, which
should enable users to isolate the affected store.
It will report no data when there are no currently open Iterators.
Reviewers: Matthias J. Sax <matthias@confluent.io>
`LongAdder` performs better than `AtomicInteger` when under contention
from many threads. Since it's possible that many Interactive Query
threads could create a large number of `KeyValueIterator`s, we don't
want contention on a metric to be a performance bottleneck.
The trade-off is memory, as `LongAdder` uses more memory to space out
independent counters across different cache lines. In practice, I don't
expect this to cause too many problems, as we're only constructing 1
per-store.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This PR uses the new TaskTopicPartition structure to simplify the build
process for the ApplicationState, which is the input to the new
TaskAssignor#assign call.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).
This new `StateStore` metric tracks the average and maximum amount of
time between creating and closing Iterators.
Iterators with very high durations can indicate to users performance
problems that should be addressed.
If a store reports no data for these metrics, despite the user opening
Iterators on the store, it suggests those iterators are not being
closed, and have therefore leaked.
Reviewers: Matthias J. Sax <matthias@confluent.io>
For task assignment purposes, the user needs to have a set of information available for each topic partition affecting the desired tasks.
This PR introduces a new interface for a read-only container class that allows all the important and relevant information to be found in one place.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Rack aware graphs don't actually need any topology information about the system, but rather require a simple ordered (not sorted) grouping of tasks.
This PR changes the internal constructors and some interface signatures of RackAwareGraphConstructor and its implementations to allow reuse by future components that may not have access to the actual subtopology information.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This rack information is required to compute rack-aware assignments, which many of the current assigners do.
The internal ClientMetadata class was also edited to pass around this rack information.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).
This new `StateStore` metric tracks the number of `Iterator` instances
that have been created, but not yet closed (via `AutoCloseable#close()`).
This will aid users in detecting leaked iterators, which can cause major
performance problems.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This PR creates the required methods to post-process the result of TaskAssignor.assign into the required ClientMetadata map. This allows most of the internal logic to remain intact after the user's assignment code runs.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Added unit tests of two processors included in foreignKey join : SubscriptionSendProcessorSupplier and ForeignTableJoinProcessorSupplier.
Renamed ForeignTableJoinProcessorSupplierTest to SubscriptionJoinProcessorSupplierTest as that's the processor which the test class is testing.
Reviewers: Walker Carlson <wcarlson@apache.org>
The test shouldCloseAllTaskProducersOnCloseIfEosEnabled() in
StreamThreadTest is flaky with a concurrent modification exception.
The concurrent modification exception is caused by the test itself
because it starts a stream thread and at the same time the thread
that executes the test calls methods on the stream thread. The stream
thread was not designed for such a concurrency.
The tests verifies that under EOS the streams producer are closed
during shutdown. Actually the test is not needed since we already
have a test that verifies that when the stream thread shuts down
also the task manager shuts down and for the tasks manager we have
tests that verify that the producers are closed when the task manager
shuts down.
This commit verifies that those tests are run under EOS and ALOS.
Reviewer: Matthias J. Sax <matthias@confluent.io>
The graph solving utilities are currently hardcoded to work with ClientState, but don't actually depend on anything in those state classes.
This change allows the MinTrafficGraphConstructor and BalanceSubtopologyGraphConstructor to be reused with KafkaStreamsStates instead.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Almog Gavra <almog@responsive.dev>
Removes the unused remove operation from the state updater
that asynchronously removed tasks and put them into an
output queue.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
This PR changes KafkaStreamsAssignment from an interface to a container class, and implements said class.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within StreamsPartitionAssignor#assign.
New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState.
One test was added to check the lag sorting within the implementation of KafkaStreamsState, which is the counterpart to the test that existed for the ClientState class.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
KStreams is able to handle the ProducerFenced (among other errors) cleanly. It does this by closing the task dirty and triggering a rebalance amongst the worker threads to rejoin the group. The producer is also recreated. Due to how streams works (writing to and reading from various topics), the application is able to figure out the last thing the fenced producer completed and continue from there. InvalidPidMappingException should be treated the same way.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Justine Olshan <jolshan@confluent.io>
Uses the new remove operation of the state updater that returns
a future to shutdown the task manager.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Configs default.windowed.value.serde.inner and default.windowed.key.serde.inner
were replace with windowed.inner.class.serde. This PR updates the docs accordingly,
plus a few more side cleanups.
Reviewers: Matthias J. Sax <matthias@confluent.io>
MissingSourceTopicException should contain the name of the missing topic.
There is one corner case for which we don't have the topic name at hand, but we can log the topic
name somewhere else.
Reviewers: Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This is the first PR in a sequence to support custom task assignors in Kafka Streams, which was described in KIP 924. It creates and exposes all of the interfaces that will need to be implemented during the refactor of the current task assignment logic.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Uses the new remove operation of the state updater that returns
a future to remove revoked tasks from the state updater.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Uses the new remove operation of the state updater that returns a future to remove lost tasks from the state udpater.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Adds a remove operation to the state updater that returns a future
instead of adding the removed tasks to an output queue. Code that
uses the state updater can then wait on the future.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Currently, the state updater writes multiple tasks per exception in the output
queue for failed tasks. To add the functionality to remove tasks synchronously
from the state updater, it is simpler that each element of the output queue for
failed tasks holds one single task.
This commit refactors the class that holds exceptions and failed tasks
in the state updater -- i.e., ExceptionAndTasks -- to just hold one single
task.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Kafka Streams DSL operators use internal wall-clock based throttling
parameters for performance reasons. These configs make the usage of TTD
difficult: users need to advance the mocked wall-clock time in
their test code, or set these internal configs to zero.
To simplify testing, TDD should disable both configs automatically.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Add the support for DescribeTopicPartitions API to AdminClient. For this initial implementation, we are simply loading all of the results into memory on the client side.
Reviewers: Andrew Schofield <aschofield@confluent.io>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
Kafka Streams announced the removal of metric forward-rate in
KIP-444 and removed it completely in AK 3.0. However, we forgot
to remove some code for this metric.
This commit removes the code to create the metric forward-rate.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Matthias J. Sax <matthias@confluent.io>
When custom processors are added via StreamBuilder#addGlobalStore they will now reprocess all records through the custom transformer instead of loading directly.
We do this so that users that transform the records will not get improperly formatted records down stream.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Omnia Ibrahim <o.g.h.ibrahim@gmail.com>
Co-authored-by: n.izhikov <n.izhikov@vk.team>
There were different words for metadata.version like metadata version or metadataVersion. Unify format as metadata.version.
Reviewers: Luke Chen <showuon@gmail.com>
This pull requests migrates the StateDirectory mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Bruno Cadonna <cadonna@apache.org>
This PR updates the javadocs for the "deprecated" hashCode() method of ProcessorRecordContext, as well as the UnsupportedOperationException thrown in its implementation, to actually explain why the class is mutable and therefore unsafe for use in hash collections. They now point out the mutable field in the class (namely the Headers)
Reviewers: Matthias Sax <mjsax@apache.org>, Bruno Cadonna <cadonna@apache.org>
The previous pull request in this series was #15261.
This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.
The next pull request in the series will be #15254 which ought to complete the Mockito migration for the TaskManagerTest class
Reviewer: Bruno Cadonna <cadonna@apache.org>
The previous pull request in this series was #15112.
This pull request continues the migration of the consumer mock in TaskManagerTest test by test for easier reviews.
I envision there will be at least 1 more pull request to clean things up. For example, all calls to taskManager.setMainConsumer should be removed.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Originally, we set commit-interval to MAX_VALUE for this test,
to ensure we only commit expliclity. However, we needed to decrease it
later on when adding the tx-timeout verification.
We did see failing test for which commit-interval hit, resulting in
failing test runs. This PR increase the commit-interval close to
test-timeout to avoid commit-interval from triggering.
Reviewers: Bruno Cadonna <bruno@confluent.io>
Kafka Streams support asymmetric join windows. Depending on the window configuration
we need to compute window close time etc differently.
This PR flips `joinSpuriousLookBackTimeMs`, because they were not correct, and
introduced the `windowsAfterIntervalMs`-field that is used to find if emitting records can be skipped.
Reviewers: Hao Li <hli@confluent.io>, Guozhang Wang <guozhang.wang.us@gmail.com>, Matthias J. Sax <matthias@confluent.io>
In order to move ConfigCommand to tools we must move all it's dependencies which includes KafkaConfig and other core classes to java. This PR moves log cleaner configuration to CleanerConfig class of storage module.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
A foreign-key-join might drop a "subscription response" message, if the value-hash changed.
This PR adds support to record such event via the existing "dropped records" sensor.
Reviewers: Matthias J. Sax <matthias@confluent.io>
Global state stores are currently flushed at each commit, which may impact performance, especially for EOS (commit each 200ms).
The goal of this improvement is to flush global state stores only when the delta between the current offset and the last checkpointed offset exceeds a threshold.
This is the same logic we apply on local state store, with a threshold of 10000 records.
The implementation only flushes if the time interval elapsed and the threshold of 10000 records is exceeded.
Reviewers: Jeff Kim <jeff.kim@confluent.io>, Bruno Cadonna <cadonna@apache.org>
a3528a3 removed this log but not the test asserting it.
Builds are currently red because for some reason these tests can't retry. We should address that as a followup.
Reviewers: Greg Harris <greg.harris@aiven.io>, Matthias J. Sax <matthias@confluent.io>
We already record dropping record via metrics and logging at WARN level
is too noise. This PR removes the unnecessary logging.
Reviewers: Kalpesh Patel <kpatel@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
A performance regression introduced in commit 5bc3aa4 reduces the write performance in RocksDB by ~3x. The bug is that we fail to pass the WriteOptions that disable the write-ahead log into the DB accessor.
For testing, the time to write 10 times 1 Million records into one RocksDB each were measured:
Before 5bc3aa4: 7954ms, 12933ms
After 5bc3aa4: 30345ms, 31992ms
After 5bc3aa4 with this fix: 8040ms, 10563ms
On current trunk with this fix: 9508ms, 10441ms
Reviewers: Bruno Cadonna <bruno@confluent.io>, Nick Telford <nick.telford@gmail.com>
ConsistencyVectorIntegrationTest failed frequently because the return
Position from IQv2 is not immutable while the test assume immutability.
To return a Position with a QueryResult that does not change, we need to
deep copy the Position object.
Reviewers: John Roesler <john@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
HTML code for configs is auto-generated and for Kafka Streams config `state.dir` produces a confusing default value.
This PR adds a new property `alternativeString` to set a "default" value which should be rendered in HTML instead of the actual default value.
Reviewers: Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, @eziosudo <eziosudo@gmail.com>, Matthias J. Sax <matthias@confluent.io>
This PR replaces a HashMap by a ConcurrentHashMap so that the local state store queries can be made from multiple threads. See this for additional context: https://lists.apache.org/thread/gpct1275bfqovlckptn3lvf683qpoxol
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Sagar Rao <sagarmeansocean@gmail.com>
The test is flaky, since sometimes one of the threads haven't processed a single record that
cause the ERROR state in test run (due to unlucky rebalancing).
Reviewers: Bruno Cadonna <bruno@confluent.io>
KAFKA-15629 added `TimestampedByteStore` interface to
`KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore
code path and thus some system tests.
This PR reverts this change for now.
Reviewers: Almog Gavra <almog.gavra@gmail.com>, Walker Carlson <wcarlson@confluent.io>
Several problems are still appearing while running 3.7 with
the state updater. This change will disable the state updater
by default also in trunk.
Reviewers: Bruno Cadonna <cadonna@apache.org>
`ColumnFamilyDescriptor` is _not_ a `RocksObject`, which in theory means
it's not backed by any native memory allocated by RocksDB.
However, in practice, `ColumnFamilyHandle#getDescriptor()`, which
returns a `ColumnFamilyDescriptor`, allocates an internal
`rocksdb::db::ColumnFamilyDescriptor`, copying the name and handle of
the column family into it.
Since the Java `ColumnFamilyDescriptor` is not a `RocksObject`, it's not
possible to track this allocation and free it from Java.
Fortunately, in our case, we can simply avoid calling
`ColumnFamilyHandle#getDescriptor()`, since we're only interested in the
column family name, which is already available on
`ColumnFamilyHandle#getName()`, which does not leak memory.
We can also optimize away the temporary `Options`, which was previously a
source of memory leaks, because `userSpecifiedOptions` is an instance of
`Options`.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
When a standby is recycled to an active and then re-assigned as
a standby again, it might happen that the recycling is still
pending when the standby is reassigned. That causes an illegal
state exception from the main consumer since the active task
that results from the recycling is actually not assigned to
the main consumer anymore, but it was re-assigned as a
standby in the most recent rebalance.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
When a partition is revoked, the corresponding task gets a pending action
"SUSPEND". This pending action may overwrite a previous pending action.
If the task was previously removed from the state updater, e.g. because
we were fenced, the pending action is overwritten with suspend, and in
handleAssigned, upon reassignment of that task, then SUSPEND action is
removed.
Then, once the state updater executes the removal, no pending action
is registered anymore, and we run into an IllegalStateException.
This commit solves the problem by adding back reassigned tasks to the
state updater, since they may have been removed from the state updater
for another reason than being restored completely.
Reviewers: Bruno Cadonna <cadonna@apache.org>
KAFKA-16025 describes the race condition sequence in detail. When this occurs, it can cause the impacted task's initializing to block indefinitely, blocking progress on the impacted task, and any other task assigned to the same stream thread. The fix I have implemented is pretty simple, simply re-check whether a directory is still empty after locking it during the start of rebalancing, and if it is, unlock it immediately. This preserves the idempotency of the method when it coincides with parallel state store cleanup executions.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
* KAFKA-16077: Streams fails to close task after restoration when input partitions are updated
There is a race condition in the state updater that can cause the following:
1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
4. The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
5. Now the task is in an initialized state, but the underlying producer does not have transactions initialized
This can cause an IllegalStateException: `Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION` when running in EOSv2.
To fix this, we introduce a new pending action CloseReviveAndUpdateInputPartitions that is added when we handle a new assignment with updated input partitions, but we still need to close the task before reopening it.
We should not remove the task twice, otherwise, we'll end up in this situation
1. We have an active task in the state updater
2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
3. The state updater moves the task from the updating tasks to the removed tasks
4. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back (adding a task+remove into the task and action queue) and add a pending action to close, revive and update input partitions
5. The task is handed back by the state updater. We close revive and update input partitions, and add the task back to the state updater
6. The state updater executes the "task+remove" action that is still in its task + action queue, and hands the task immediately back to the main thread
7. The main thread discoveres a removed task that was not restored and has no pending action attached to it. IllegalStateException
Reviewers: Bruno Cadonna <cadonna@apache.org>
We allocate an `Options` in order to list column families while opening
the `RocksDBStore`, but never explicitly `close()` it.
`Options` is a RocksDB native object, which needs to be explicitly
closed to free the resources it allocates in native memory.
Failing to close this causes a memory leak when repeatedly
opening/closing stores.
It's an `AutoCloseable`, and all usage of it is confined to the
surrounding `try` block, so we can just hook it out to the `try` to
auto-close it when done.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
To support future use-cases that use different strategies for accessing
RocksDB, we need to de-couple the RocksDB access strategy from the
Column Family access strategy.
To do this, we now have two separate accessors:
* `DBAccessor`: dictates how we access RocksDB. Currently only one
strategy is supported: `DirectDBAccessor`, which access RocksDB
directly, via the `RocksDB` class for all operations. In the future, a
`BatchedDBAccessor` will be added, which enables transactions via
`WriteBatch`.
* `ColumnFamilyAccessor`: maps StateStore operations to operations on
one or more column families. This is a rename of the old
`RocksDBDBAccessor`.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Increase log level to INFO similar to other log statement in this class, to surface important information on the non-critical code path.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This pull request takes a similar approach to how TaskManagerTest is being migrated to Mockito mock by mock for easier reviews.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This is the corollary to #15061 for outer joins, which don't use timestamped KV stores either (compared to just window stores).
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Lucas Brutschy <lbrutschy@confluent.io>
We attempt to update lags when in state PENDING_SHUTDOWN or PARTITIONS_REVOKED. In these states,
however, our representation of the assignment may not be up-to-date with the subscription
object inside the consumer. This can cause a bug, in particular, when we subscribe to a
set of topics via a regular expression, and the underlying topic is deleted. The consumer
subscription may reflect that topic deletion already, while our internal state still
contains references to the deleted topic, because `onAssignment` has not yet been
executed. Therefore, we will attempt to call `currentLag` on partitions that are not
assigned to us any more inside the consumer, leading to an `IllegalStateException`.
This bug causes flakiness of the test
`RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@apache.org>
Before #14648, the KStreamImplJoin class would always create non-timestamped persistent windowed stores. After that PR, the default was changed to create timestamped stores. This wasn't compatible because, during restoration, timestamped stores have their values transformed to prepend the timestamp to the value. This caused serialization errors when trying to read from the store because the deserializers did not expect the timestamp to be prepended.
To fix this, we allow creating non-timestamped stores using the DslWindowParams
Testing was done both manually as well as adding a unit test to ensure that the stores created are not timestamped. I also confirmed that the only place in the code persistentWindowStore was used before #14648 was in the StreamJoined code.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
We are not properly closing Closeable resources in the code base at multiple places especially when we have an exception. This code change fixes multiple of these leaks.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Luke Chen <showuon@gmail.com>, Mickael Maison <mickael.maison@gmail.com>
Kafka Streams checkpoints the wrong offset when a task is closed during
restoration. If under exactly-once processing guarantees a
TaskCorruptedException happens, the affected task is closed dirty, its
state content is wiped out and the task is re-initialized. If during
the following restoration the task is closed cleanly, the task writes
the offsets that it stores in its record collector to the checkpoint
file. Those offsets are the offsets that the task wrote to the changelog
topics. In other words, the task writes the end offsets of its changelog
topics to the checkpoint file. Consequently, when the task is
initialized again on the same Streams client, the checkpoint file is
read and the task assumes it is fully restored although the records
between the last offsets the task restored before closing clean and
the end offset of the changelog topics are missing locally.
The fix is to clear the offsets in the record collector on close.
Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
* KAFKA-14412: Generalise over RocksDB WriteBatch
The type hierarchy of RocksDB's `WriteBatch` looks like this:
```
+---------------------+
| WriteBatchInterface |
+---------------------+
^
|
+---------------------+
| AbstractWriteBatch |
+---------------------+
^
|
+----------+----------+
| |
+------------+ +---------------------+
| WriteBatch | | WriteBatchWithIndex |
+------------+ +---------------------+
```
By switching our `BatchWritingStore` methods from `WriteBatch` to
`WriteBatchInterface`, we enable the use of `WriteBatchWithIndex` as
well.
* Improve error reporting for unknown batch type
We should be using an `IllegalStateException`, and we should log a
message informing the user that this is a bug.
This branch should be unreachable, as both of the possible
implementations of `WriteBatchInterface` are matched in the previous two
branches.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This patch bumps the next release version to 3.8.0-SNAPSHOT.
Following the Release Process, I created the 3.7 branch and am following the steps to bump these versions:
Modify the version in trunk to bump to the next one (eg. "0.10.1.0-SNAPSHOT") in the following files:
docs/js/templateData.js
gradle.properties
kafka-merge-pr.py
streams/quickstart/java/pom.xml
streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
streams/quickstart/pom.xml
tests/kafkatest/__init__.py
In many parameterized tests, the display name is broken. Example - testMetadataFetch appears as [1] true, [2] false link
This is because the constant in @ParameterizedTest
String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}";
This PR adds a new junit-platform.properties which overrides to add a {displayName} which shows the the display name of the method
For existing tests which override the name, should work as is. The precedence rules are explained
name attribute in @ParameterizedTest, if present
value of the junit.jupiter.params.displayname.default configuration parameter, if present
DEFAULT_DISPLAY_NAME constant defined in @ParameterizedTest
Source: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names
Sample test run output
Before: [1] true link
After: testMetadataExpiry(boolean).false link
This commit is an extension of bdf6d46b41 which needed to reverted due to introduces test failures.
Reviewers: David Jacot <djacot@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Part of KIP-714.
Add support to collect client instance id of the global consumer.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This reverts commit bdf6d46b41. We found out that this commit introduced flakiness in Streams' tests. We will revise it.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Part of KIP-714.
Adds support to expose main consumer client instance id.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
Minor cleanup to make it easier to follow the restore listener logic. Currently, the KafkaStreams class tracks two restore listener fields: there is a non-final, nullable "globalRestoreListener" that holds the restore listener specified by the user (if any), and then there is a final "delegatingRestoreListener" that's used to encapsulate the null checks for the user-specified restore listener. It's a bit confusing to follow along with what each of these restore listener fields is doing and the relationship between them when they're on equal footing like this, when in reality they're more hierarchical and the DelegatingRestoreListener is actually a wrapper over the user-specified globalRestoreListener. The term "global" is also a bit misleading as it can get mixed up with global state stores, when it's really meant to be "global" in the sense that it applies to all state stores in the application.
It would be nice to just move the user listener completely inside the DelegatingRestoreListener class and then make that class static, as well as renaming the field to "userRestoreListener"
Reviewers: Matthias J. Sax <mjsax@apache.org>
Implementation for KIP-988, adds the new StandbyUpdateListener interface
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Colt McNealy <colt@littlehorse.io>
Relax non-null FK left-join requirement.
Testing Strategy: Inject extractor which returns null on first or second element.
Reviewers: Walker Carlson <wcarlson@apace.org>
- Part of KIP-714
- Add new configs and public API for Kafka Streams
- Implement support to get admin client instance id
Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>, Apoorv Mittal <amittal@confluent.io>, Walker Carlson <wcarlson@confluent.io>
RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted does not wait to ensure that test-topic-A is deleted. The second assignment condition times out in 15sec.
We should wait for the topic to be deleted (default timeout = 30sec) and then check the assignment.
Reviewers: Walker Carlson <wcarlson@apache.org>
When opening RocksDB, we were checking for an error in
`RocksDBTimestampedStore` to detect if the `keyValueWithTimestamp` CF is
missing.
The `openRocksDB` method now supports any number of column families, not
just the extra one used by `RocksDBTimestampedStore`. We now check for
the existing column families _before_ opening the database, which allows
us to create any missing column families.
Supporting automatic creation of any number of missing column families
is a pre-requisite for KIP-892: Transactional StateStores.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@apache.org>
This pull request is an attempt to get what has started in #12524 to completion as part of the Streams project migration to Mockito.
Reviewers: Divij Vaidya <diviv@amazon.com>, Bruno Cadonna <cadonna@apache.org>
The following race can happen in the state updater code path
Task is restoring, owned by state updater
We fall out of the consumer group, lose all partitions
We therefore register a "TaskManager.pendingUpdateAction", to CLOSE_DIRTY
We also register a "StateUpdater.taskAndAction" to remove the task
We get the same task reassigned. Since it's still owned by the state updater, we don't do much
The task completes restoration
The "StateUpdater.taskAndAction" to remove will be ignored, since it's already restored
Inside "handleRestoredTasksFromStateUpdater", we close the task dirty because of the pending update action
We now have the task assigned, but it's closed.
To fix this particular race, we cancel the "close" pending update action. Furthermore, since we may have made progress in other threads during the missed rebalance, we need to add the task back to the state updater, to at least check if we are still at the end of the changelog. Finally, it seems we do not need to close dirty here, it's enough to close clean when we lose the task, related to KAFKA-10532.
This should fix the flaky EOSIntegrationTest.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Implements KIP-992.
Adds TimestampedKeyQuery and TimestampedRangeQuery (IQv2) for ts-ks-store, plus changes semantics of existing KeyQuery and RangeQuery if issues against a ts-kv-store, now unwrapping value-and-timestamp and only returning the plain value.
Reviewers: Matthias J. Sax <matthias@confluent.io>
This interface provides a common supertype for `StreamThread` and
`DefaultTaskExecutor.TaskExecutorThread`, which will be used by KIP-892
to differentiate between "processing" threads and interactive query
threads.
This is needed because `DefaultTaskExecutor.TaskExecutorThread` is
`private`, so cannot be seen directly from `RocksDBStore`.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Introduce a dummy node connected to every other node and run Bellman-ford from the dummy node once instead of from every node in the graph.
Reviewers: Qichao Chu (@ex172000), Matthias J. Sax <matthias@confluent.io>
Named topologies is a feature that is planned to be removed from AK with 4.0 and was never used via the public interface. It was used in a few versions of KSQL only, but was disabled there as well. While we do not want to remove it in 3.7 yet, we should disable flaky tests in that feature, that are disruptive to AK development.
Reviewers: Bruno Cadonna <cadonna@apache.org>
Implementation for KIP-954: support custom DSL store providers
Testing Strategy:
- Updated the topology tests to ensure that the configuration is picked up in the topology builder
- Manually built a Kafka Streams application using a customer DslStoreSuppliers class and verified that it was used
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang.wang.us@gmail.com>
The state updater waits on a condition variable if no tasks exist that need to be updated. The condition variable is wrapped by a loop to account for spurious wake-ups. The check whether updating tasks exist is done in the condition of the loop. Actually, the state updater thread can change whether updating tasks exists, but since the state updater thread is waiting for the condition variable the check for the existence of updating tasks will always return the same value as long as the state updater thread is waiting. Thus, the check only need to be done once before entering the loop.
This commit moves check before the loop making also the usage of mocks more robust since the processing becomes more deterministic.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
A few variables which aren't being used anymore but still exist. This commit removes those unused variables.
Co-authored-by: Sagar Rao <sagarrao@Sagars-MacBook-Pro.local>
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
The consumer refactoring project introduced another `Consumer` implementation, creating two different, coexisting implementations of the `Consumer` interface:
* `KafkaConsumer` (AKA "existing", "legacy" consumer)
* `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer)
The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level `KafkaConsumer` but then delegate to another implementation under the covers. There will be two delegates at first:
* `LegacyKafkaConsumer`
* `AsyncKafkaConsumer`
`LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That implementation handles the existing group protocol. `AsyncKafkaConsumer` is renamed from `PrototypeAsyncConsumer` and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the `internals` sub-package to discourage their use.
This task is part of the work to implement support for the new KIP-848 consumer group protocol.
Reviewers: Philip Nee <pnee@confluent.io>, Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
This patch fixes the compilation error for JDK 21 introduced in https://github.com/apache/kafka/pull/14708.
Reviewers: Ismael Juma <ismael@juma.me.uk>, David Jacot <djacot@confluent.io>
This is a follow up from #14659 that ports the windowed classes to use the StoreFactory abstraction as well. There's a side benefit of not duplicating the materialization code twice for each StreamImpl/CogroupedStreamImpl class as well.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias Sax <mjsax@apache.org>
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR #8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor.
This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
When a task is corrupted, uncorrupted tasks are committed. That is also true for standby tasks. Committing standby tasks actually means that they are checkpointed.
When the state updater is enabled, standbys are owned by the state updater. The stream thread should not checkpoint them when handling corrupted tasks. That is not a big limitation since the state updater periodically checkpoints standbys anyway. Additionally, when handling corrupted tasks the important thing is to commit active running tasks to abort the transaction. Committing standby tasks do not abort any transaction.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
This PR sets up the necessary prerequisites to respect configurations such as dsl.default.store.type and the dsl.store.suppliers.class (introduced in #14648) without requiring them to be passed in to StreamBuilder#new(TopologyConfig) (passing them only into new KafkaStreams(...).
It essentially makes StoreBuilder an external-only API and internally it uses the StoreFactory equivalent. It replaces KeyValueStoreMaterializer with an implementation of StoreFactory that creates the store builder only after configure() is called (in a Future PR we will create the missing equivalents for all of the places where the same thing happens for window stores, such as in all the *WindowKStreamImpl classes)
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
[KIP-962](https://cwiki.apache.org/confluence/display/KAFKA/KIP-962%3A+Relax+non-null+key+requirement+in+Kafka+Streams)
The key requirments got relaxed for the followinger streams dsl operator:
left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.
Reviewers: Walker Carlson <wcarlson@apache.org>
Part of KIP-985.
Updates JavaDocs for `RangeQuery` and `ReadOnlyKeyValueStore` with regard to ordering guarantees.
Updates Kafka Streams upgrade guide with KIP information.
Reviewer: Matthias J. Sax <matthias@confluent.io>
Trying to fix flakiness for the shouldInvokeUserDefinedGlobalStateRestoreListener test introduced in #14519.
Fixes are:
-Do not use static membership.
-Always close the 2nd KafkaStreams instance.
-Await for the Kafka Streams instance to transition to a RUNNING state before proceeding.
-Added logging for the StateRestoreListener implementation.
-Reduce restore consumer MAX_POLL_RECORDS.
Reviewers: Anna Sophie Blee-Goldman <sophie@responsive.dev>
After finishing restoration, we should only log the active tasks. Standby tasks are not part of restoration and it can be confusing to see them show up on this log message.
Reviewers: Matthias J. Sax <matthias@confluent.io>
- Introduce a new internal config flag to enable processing threads
- If enabled, create a scheduling task manager inside the normal task manager (renamings will be added on top of this), and use it from the stream thread
- All operations inside the task manager that change task state, lock the corresponding tasks if processing threads are enabled.
- Adds a new abstract class AbstractPartitionGroup. We can modify the underlying implementation depending on the synchronization requirements. PartitionGroup is the unsynchronized subclass that is going to be used by the original code path. The processing thread code path uses a trivially synchronized SynchronizedPartitionGroup that uses object monitors. Further down the road, there is the opportunity to implement a weakly synchronized alternative. The details are complex, but since the implementation is essentially a queue + some other things, it should be feasible to implement this lock-free.
- Refactorings in StreamThreadTest: Make all tests use the thread member variable and add tearDown in order avoid thread leaks and simplify debugging. Make the test parameterized on two internal flags: state updater enabled and processing threads enabled. Use JUnit's assume to disable all tests that do not apply.
Enable some integration tests with processing threads enabled.
Reviewer: Bruno Cadonna <bruno@confluent.io>