This patch includes metadata wait time in total blocked time. First, this patch adds a new metric for total producer time spent waiting on metadata, called metadata-wait-time-ms-total. Then, this time is included in the total blocked time computed from StreamsProducer.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The caching store layers were passing down writes into lower store layers upon eviction, but not setting the context to the evicted records' context. Instead, the context was from whatever unrelated record was being processed at the time.
Reviewers: Matthias J. Sax <mjsax@apache.org>
Reviewers: Hao Li <hli@confluent.io>, A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
This PR is part of KIP-708 and adds rack aware standby task assignment logic.
Rack aware standby task assignment won't be functional until all parts of this KIP gets merged.
Splitting PRs into three smaller PRs to make the review process easier to follow. Overall plan is the following:
⏭️ Rack aware standby task assignment logic #10851⏭️ Protocol change, add clientTags to SubscriptionInfoData #10802👉 Add required configurations to StreamsConfig (public API change, at this point we should have full functionality)
This PR implements last point of the above mentioned plan.
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This test was falling occasionally. It does appear to be a matter of the tests assuming perfecting deduplication/caching when asserting the test output records, ie a bug in the test not in the real code. Since we are not assuming that it is going to be perfect I changed the test to make sure the records we expect arrive, instead of only those arrive.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
I found a couple of flakiness with the integration test.
IQv1 on stores failed although getting the store itself is covered with timeouts, since the InvalidStoreException is upon the query (store.all()). I changed to the util function with IQv2 whose timeout/retry covers the whole procedure. Example of such failure is: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11802/11/tests/
With ALOS we should not check that the output, as well as the state store content is exactly as of processed once, since it is possible that during processing we got spurious task-migrate exceptions and re-processed with duplicates. I actually cannot reproduce this error locally, but from the jenkins errors it seems possible indeed. Example of such failure is: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11433/4/tests/
Some minor cleanups.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>
We added `currentSystemTimeMs()` and `currentStreamTimeMs()` to the
`ProcessorContext` via KIP-622, but forgot to add both to the new
`api.ProcessorContext`.
Reviewers: Ricardo Brasil <anribrasil@gmail.com>, Guozhang Wang <guozhang@confluent.io>
Initial State store implementation for TimedWindow and SlidingWindow.
RocksDBTimeOrderedWindowStore.java contains one RocksDBTimeOrderedSegmentedBytesStore which contains index and base schema.
PrefixedWindowKeySchemas.java implements keyschema for time ordered base store and key ordered index store.
Reviewers: James Hughes, Guozhang Wang <wangguoz@gmail.com>
In this test, we started Kafka Streams app and then write to input topic in transaction. It's possible when streams commit offset, transaction hasn't finished yet. So the streams committed offset could be less than the eventual endOffset.
This PR moves the logic of writing to input topic before starting streams app.
Reviewers: John Roesler <vvcephei@apache.org>
This reverts commit 2ccc834faa.
This reverts commit 2ccc834. We were seeing serious regressions in our state heavy benchmarks. We saw that our state heavy benchmarks were experiencing a really bad regression. The State heavy benchmarks runs with rolling bounces with 10 nodes.
We regularly saw this exception: java.lang.OutOfMemoryError: Java heap space
I ran through a git bisect and found this commit. We verified that the commit right before did not have the same issues as this one did. I then reverted the problematic commit and ran the benchmarks again on this commit and did not see any more issues. We are still looking into the root cause, but for now since this isn't a critical improvement so we can remove it temporarily.
Reviewers: Bruno Cadonna <cadonna@confluent.io>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, David Jacot <djacot@confluent.io>, Ismael Juma <ismael@confluent.io>
This test has started to become flaky at a relatively low, but consistently reproducible, rate. Upon inspection, we find this is due to IOExceptions during the #cleanUpNamedTopology call -- specifically, most often a DirectoryNotEmptyException with an ocasional FileNotFoundException
Basically, signs pointed to having returned from/completed the #removeNamedTopology future prematurely, and moving on to try and clear out the topology's state directory while there was a streamthread somewhere that was continuing to process/close its tasks.
I believe this is due to updating the thread's topology version before we perform the actual topology update, in this case specifically the act of eg clearing out a directory. If one thread updates its version and then goes to perform the topology removal/cleanup when the second thread finishes its own topology removal, this other thread will check whether all threads are on the latest version and complete any waiting futures if so -- which means it can complete the future before the first thread has actually completed the corresponding action
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Quick fix to make sure we log the actual source of the failure both in the actual log message as well as the StreamsException that we bubble up to the user's exception handler, and also to report the offending topology by filling in the StreamsException's taskId field.
Also prevents a NoSuchElementException from being thrown when trying to compute the minimum topology version across all threads when the last thread is being unregistered during shutdown.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
We used to call TopologyMetadata#maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion when a thread was being unregistered/shutting down, to check if any of the futures listening for topology updates had been waiting on this thread and could be completed. Prior to invoking this we make sure to remove the current thread from the TopologyMetadata's threadVersions map, but this thread is actually then re-added in the #maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion call.
To fix this, we should break up this method into separate calls for each of its two distinct functions, updating the version and checking for topology update completion. When unregistering a thread, we should only invoke the latter method
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
While debugging the flaky NamedTopologyIntegrationTest. shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing test, I did discover one real bug. The problem was that we update the TopologyMetadata's builders map (with the known topologies) inside the #removeNamedTopology call directly, whereas the StreamThread may not yet have reached the poll() in the loop and in case of an offset reset, we get an NP.e
I changed the NPE to just log a warning for now, going forward I think we should try to tackle some tech debt by keeping the processing tasks and the TopologyMetadata in sync
Also includes a quick fix on the side where we were re-adding the topology waiter/KafkaFuture for a thread being shut down
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
This test has been failing somewhat regularly due to going into the ERROR state before reaching RUNNING during the startup phase. The problem is that we are reusing the DELAYED_INPUT_STREAM topics, which had previously been assumed to be uniquely owned by a particular test. We should make sure to delete and re-create these topics for any test that uses them.
Currently the #add/removeNamedTopology APIs behave a little wonky when the application is still in CREATED. Since adding and removing topologies runs some validation steps there is valid reason to want to add or remove a topology on a dummy app that you don't plan to start, or a real app that you haven't started yet. But to actually check the results of the validation you need to call get() on the future, so we need to make sure that get() won't block forever in the case of no failure -- as is currently the case
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
This PR is part of KIP-708 and adds rack aware standby task assignment logic.
Reviewer: Bruno Cadonna <cadonna@apache.org>, Luke Chen <showuon@gmail.com>, Vladimir Sitnikov <vladimirsitnikov.apache.org>
Pretty much any time we have an integration test failure that's flaky or only exposed when running on Jenkins through the PR builds, it's impossible to debug if it cannot be reproduced locally as the logs attached to the test results have truncated the entire useful part of the logs. This is due to the logs being flooded at the beginning of the test when the Kafka cluster is coming up, eating up all of the allotted characters before we even get to the actual Streams test. Setting log4j.logger.kafka to ERROR greatly improves the situation and cuts down on most of the excessive logging in my local runs. To improve things even more and have some hope of getting the part of the logs we actually need, I also set the loggers for all of the Config objects to ERROR, as these print out the value of every single config (of which there are a lot) and are not useful as we can easily figure out what the configs were if necessary by just inspecting the test locally.
Reviewers: Luke Chen <showuon@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Seen a few of the new tests added fail on PR builds lately with
"java.lang.AssertionError: Expected all streams instances in [org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper@7fb3e6b0] to be RUNNING within 30000 ms"
We already had some tests using the 30s timeout while others were bumped all the way up to 60s, I figured we should try out a default timeout of 45s and if we still see failures in specific tests we can go from there
This PR addresses the remaining nits from the final review of #11787
It also deletes two integration test classes which had only one test in them, and moves the tests to another test class file to save on the time to bring up an entire embedded kafka cluster just for a single run
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
We should be able to change the topologies while still in the CREATED state. We already allow adding them, but this should include removing them as well
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Quick followup to #11787 to optimize the impact of the task backoff by reducing the time to replace a thread. When a thread is going through a dirty close, ie shutting down from an uncaught exception, we should be sending a LeaveGroup request to make sure the broker acknowledges the thread has died and won't wait up to the `session.timeout` for it to join the group if the user opts to `REPLACE_THREAD` in the handler
Reviewers: Walker Carlson <wcarlson@confluent.io>, John Roesler <vvcephei@apache.org>
Part 1 in the initial series of error handling for named topologies.
*Part 1: Track tasks with errors within a named topology & implement constant-time based task backoff
Part 2: Implement exponential task backoff to account for recurring errors
Part 3: Pause/backoff all tasks within a named topology in case of a long backoff/frequent errors for any individual task
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
When we hit an exception when processing tasks we should save the work we have done so far.
This will only be relevant with ALOS and EOS-v1, not EOS-v2. It will actually reduce the number of duplicated record in ALOS because we will not be successfully processing tasks successfully more than once in many cases.
This is currently enabled only for named topologies.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang@confluent.io>
Tests are swallowing exceptions for supported operating systems, which could hide regressions.
Co-authored-by: Rob Leland <rleland@apache.org>
Reviewer: Bruno Cadonna <cadonna@apache.org>
Basic refactoring with no logical changes to lay the groundwork & facilitate reviews for error handling work.
This PR just moves all methods that go beyond the management of tasks into a new TaskExecutor class, such as processing, committing, and punctuating. This breaks up the ever-growing TaskManager class so it can focus on the tracking and updating of the tasks themselves, while the TaskExecutor can focus on the actual processing. In addition to cleaning up this code this should make it easier to test this part of the code.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
For debugging it is useful to see the actual state directory when
an exception regarding the state directory is thrown.
Reviewer: Bill Bejeck <bbejeck@apache.org>
Previously we were only verifying the new query could be added after we had already inserted it into the TopologyMetadata, so we need to move the validation upfront.
Also adds a test case for this and improves handling of NPE in case of future or undiscovered bugs.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
RocksDB v6.27.3 has been released and it is the first release to support s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle without s390x support.
RocksDB v6.27.3 has added some new options that require an update to streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java but no other changes are needed to upgrade.
I have run the unit/integration tests locally on s390x and also the :streams tests on x86_64 and they pass.
Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Co-authored-by: Kurt Ostfeld <kurt@samba.tv>
Reviewers: Kvicii <Karonazaba@gmail.com>, Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
This PR is an implementation of: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390. The following changes have been made:
* Adding a new config input.buffer.max.bytes applicable at a topology level.
* Adding new config statestore.cache.max.bytes.
* Adding new metric called input-buffer-bytes-total.
* The per partition config buffered.records.per.partition is deprecated.
* The config cache.max.bytes.buffering is deprecated.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang@confluent.io>
There are cases in which a state store neither has an in-memory position built up nor has it gone through the state restoration process. If a store is persistent (i.e., RocksDB), and we stop and restart Streams, we will have neither of those continuity mechanisms available.
This patch:
* adds a test to verify that all stores correctly recover their position after a restart
* implements storage and recovery of the position for persistent stores alongside on-disk state
Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>, John Roesler <vvcephei@apache.org>
Currently, IQv2 forwards all queries to the underlying store. We add this bypass to allow handling of key queries in the cache. If a key exists in the cache, it will get answered from there.
As part of this PR, we realized we need access to the position of the underlying stores. So, I added the method getPosition to the public API and ensured all state stores implement it. Only the "leaf" stores (Rocks*, InMemory*) have an actual position, all wrapping stores access their wrapped store's position.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, John Roesler <vvcephei@apache.org>
There is a brief window between when the store is registered and when
it is initialized when it might handle a query, but there is no context.
We treat this condition just like a store that hasn't caught up to the
desired position yet.
Reviewers: Guozhang Wang <guozhang@apache.org>, Matthias J. Sax <mjsax@apache.org>, A. Sophie Blee-Goldman <ableegoldman@apache.org>, Patrick Stuedi <pstuedi@apache.org>
Followup to #11600 to invoke the streams exception handler on the MissingSourceTopicException, without killing/replacing the thread
Reviewers: Guozhang Wang <guozhang@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
Fixes some issues with the NamedTopology version of the IQ methods that accept a topologyName argument, and adds tests for all.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Avoid throwing a MissingSourceTopicException inside the #assign method when named topologies are used, and just remove those topologies which are missing any of their input topics from the assignment.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
During some recent reviews, @mjsax pointed out that StateStore layers
are constructed differently the stores are added via the PAPI vs. the DSL.
This PR adds PAPI construction for Window and Session stores to the
IQv2StoreIntegrationTest so that we can ensure IQv2 works on every
possible state store.
Reviewer: Guozhang Wang <guozhang@apache.org>
During some recent reviews, @mjsax pointed out that StateStore layers
are constructed differently the stores are added via the PAPI vs. the DSL.
This PR adds KeyValueStore PAPI construction to the
IQv2StoreIntegrationTest so that we can ensure IQv2 works on every
possible state store.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, Guozhang Wang <guozhang@apache.org>
Implement the KeyQuery as proposed in KIP-796
Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <guozhang@apache.org>
When this was made I didn't expect deleteOffsetsResult to be set if an exception was thrown. But it is and to retry we need to reset it to null. Changing the KafkaStreamsNamedTopologyWrapper for remove topology when resetting offsets to retry upon GroupSubscribedToTopicException and swallow/complete upon GroupIdNotFoundException
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@ache.>
Update an unclear log message and method name in TopologyMetadata
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <guozhang@confluent.io>, Luke Chen <showuon@confluent.io>
* Fill in the Position response in the IQv2 result.
* Enforce PositionBound in IQv2 queries.
* Update integration testing approach to leverage consistent queries.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, Vicky Papavasileiou <vpapavasileiou@confluent.io>, Guozhang Wang <guozhang@apache.org>
Fix for one of the causes of failure in the NamedTopologyIntegrationTest: org.apache.kafka.streams.errors.StreamsException: java.lang.IllegalStateException: Must initialize prevActiveTasks from ownedPartitions before initializing remaining tasks.
This exception could occur if a member sent in a subscription where all of its ownedPartitions were from a named topology that is no longer recognized by the group leader, eg because it was just removed from the client. We should filter each ClientState based on the current topology only so the assignor only processes the partitions/tasks it can identify. The member with the out-of-date tasks will eventually clean them up when the #removeNamedTopology API is invoked on them
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Use the name specified via consumed parameter in InternalStreamsBuilder#addGlobalStore method for initializing the source name and processor name. If not specified, the names are generated.
Reviewers: Luke Chen <showuon@gmail.com>, Bill Bejeck <bbejeck@apache.org>
In the new NamedTopology API being worked on, state store names and their uniqueness requirement is going to be scoped only to the owning topology, rather than to the entire app. In other words, two different named topologies can have different state stores with the same name.
This is going to cause problems for some of the existing IQ APIs which require only a name to resolve the underlying state store. We're now going to need to take in the topology name in addition to the state store name to be able to locate the specific store a user wants to query
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Introduces changelog headers to pass position information
to standby and restoring stores. The headers are guarded by an internal
config, which defaults to `false` for backward compatibility. Once IQv2
is finalized, we will make that flag a public config.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, John Roesler <vvcephei@apache.org>
When the unit tests of the internal topic manager test
are executed on a slow machine (like sometimes in automatic builds)
they sometimes fail with a timeout exception instead of the expected
exception. To fix this behavior, this commit replaces the use of
system time with mock time.
Reviewer: John Roesler <vvcephei@apache.org>
Implements the major part of the IQv2 framework as proposed in KIP-796.
Reviewers: Patrick Stuedi <pstuedi@apache.org>, Vicky Papavasileiou <vpapavasileiou@confluent.io>, Bruno Cadonnna <cadonna@apache.org>
Log messages were changed in the AssignorConfiguration (#11490) that are
also used for verification in system test
StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance.
This commit fixes the test and adds comments to the log messages
that point to the test that needs to be updated in case of
changes to the log messages.
Reviewers: John Roesler <vvcephei@apache.org>, Luke Chen <showuon@gmail.com>, David Jacot <djacot@confluent.io>
With a nice mock in RocksDBMetricsRecorderTest#shouldCorrectlyHandleHitRatioRecordingsWithZeroHitsAndMisses() and RocksDBMetricsRecorderTest#shouldCorrectlyHandleAvgRecordingsWithZeroSumAndCount() were green although getTickerCount() was never called. The tests were green because EasyMock returns 0 for a numerical return value by default if no expectation is specified. Thus, commenting out the expectation for getTickerCount() did not change the result of the test.
This commit changes the mock to a default mock and fixes the expectation to expect getAndResetTickerCount(). Now, commenting out the expectation leads to a test failure.
Reviewers: Luizfrf3 <lf.fonseca@hotmail.com>, Guozhang Wang <wangguoz@gmail.com>
Deprecate eager rebalancing protocol in kafka streams and log warning message when upgrade.from is set to 2.3 or lower. Also add a note in upgrade doc to prepare users for the removal of eager rebalancing support
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Add additional asserts for KafkaStreamsTest: testInitializesAndDestroysMetricsReporters to help diagnose if it flakily fails in the future.
- MockMetricsReporter gets initialized only once during KafkaStreams construction, so make assert check stricter by ensuring initDiff is one.
- Assert KafkaStreams is not running before we validate whether MockMetricsMetricsReporter close count got incremented after streams close.
Reviewer: Bruno Cadonna <cadonna@apache.org>
Most configs that are read and used by Streams today originate from the properties passed in to the KafkaStreams constructor, which means they get applied universally across all threads, tasks, subtopologies, and so on. The only current exception to this is the topology.optimization config which is parsed from the properties that get passed in to StreamsBuilder#build. However there are a handful of configs that could also be scoped to the topology level, allowing users to configure each NamedTopology independently of the others, where it makes sense to do so.
This PR refactors the handling of these configs by interpreting the values passed in via KafkaStreams constructor as the global defaults, which can then be overridden for individual topologies via the properties passed in when building the NamedTopology. More topology-level configs may be added in the future, but this PR covers the following:
max.task.idle.ms
task.timeout.ms
buffered.records.per.partition
default.timestamp.extractor.class
default.deserialization.exception.handler
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Walker Carlson <wcarlson@confluent.io>, Luke Chen <showuon@confluent.io>
* Migrate KTable joins to new Processor API.
* Migrate missing KTableProcessorSupplier implementations.
* Replace KTableProcessorSupplier with new Processor API implementation.
Reviewers: John Roesler <vvcephei@apache.org>
This PR adds some RocksDB metrics that could not be added in KIP-471 due to RocksDB version. The new metrics are extracted using Histogram data provided by RocksDB API, and the old ones were extracted using Tickers. The new metrics added are memtable-flush-time-(avg|min|max) and compaction-time-(avg|min|max).
Reviewer: Bruno Cadonnna <cadonna@apache.org>
Minor followup to #11405 / KIP-783 to write down the new guarantees we're providing about the meaning of a StreamsException in the javadocs of that class
Reviewers: Bruno Cadonna <cadonna@apache.org>
As raised in KAFKA-12994, All tests that use the old API should be either eliminated or migrated to the new API in order to remove the @SuppressWarnings("deprecation") annotations.
This PR migrates the SlidingWindowsTest to the new API.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
To help users distinguish which task an exception was thrown from, and which NamedTopology if it exists, we add a TaskId field to the StreamsException class. We then make sure that all exceptions thrown to the handler are wrapped as StreamsExceptions, to help the user simplify their handling code as they know they will always need to unwrap the thrown exception exactly once.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Luke Chen <showuon@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, John Roesler <vvcephei@apache.org>
Disallow calling grace() if it was already set via ofTimeDifferenceAndGrace/WithNoGrace(). Add the check to disallow grace called after grace set via new API, and add tests for them.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
We observed on the broker side that txn-offset-commit request with empty topics are received. After checking the source code I found there's on place on Streams which is unnecessarily sending empty offsets. This PR cleans up the streams layer logic a bit to not send empty offsets, and at the same time also guard against empty offsets at the producer layer as well.
Reviewers: Luke Chen <showuon@gmail.com>, Jason Gustafson <jason@confluent.io>
As part of the migration of KStream/KTable operations to the new
Processor API https://issues.apache.org/jira/browse/KAFKA-8410,
this PR includes the migration of KStream aggregate/reduce operations.
Reviewers: John Roesler <vvcephei@apache.org>
Recently a user hit this TaskAssignmentException due to a bug in their regex that meant no topics matched the pattern subscription, which in turn meant that it was impossible to resolve the number of partitions of the downstream repartition since there was no upstream topic to get the partition count for. Debugging this was pretty difficult and ultimately came down to stepping through the code line by line, since even with TRACE logging we only got a partial picture.
We should expand the logging to make sure the TRACE logging hits both conditional branches, and improve the error message with a suggestion for what to look for should someone hit this in the future
Reviewers: Walker Carlson <wcarlson@confluent.io>, Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
In #9139, we added backward iterator on SessionStore. But there is a bug that when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman<apache.org>
This PR aims to utilize HighAvailabilityTaskAssignor to avoid downtime on corrupted tasks. The idea is that, when we hit TaskCorruptedException on an active task, a rebalance is triggered after we've wiped out the corrupted state stores. This will allow the assignor to temporarily redirect this task to another client who can resume work on the task while the original owner works on restoring the state from scratch.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
As raised in KAFKA-12994, All tests that use the old API should be either eliminated or migrated to the new API in order to remove the @SuppressWarnings("deprecation") annotations. This PR will migrate over all the relevant tests in TimeWindowsTests.java
Reviewers: Anna Sophie Blee-Goldman
As part of the migration of KStream/KTable operations to the new Processor API https://issues.apache.org/jira/browse/KAFKA-8410, this PR includes the migration of KTable aggregate/reduce operations.
Reviewers: John Roesler <vvcephei@apache.org>
Add support for infinite range query for WindowStore. Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13210
Reviewers: Patrick Stuedi <pstuedi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
KAFKA-13246: StoreQueryIntegrationTest#shouldQueryStoresAfterAddingAndRemovingStreamThread does not gate on stream state well
The test now waits for the client to transition to REBALANCING/RUNNING after adding/removing a thread as well as to transition to RUNNING before querying the state store.
Reviewers: singingMan <@3schwartz>, Walker Carlson <wcarlson@confluent.io>, John Roesler <vvcephei@apache.org>
This is to make sure that even if logging is disabled, we would still return null in order to workaround the deserialization issue for stream-stream left/outer joins.
Reviewers: Matthias J. Sax <mjsax@apache.org>
This is an alternative approach in parallel to #11235. After several unsuccessful trials to improve its efficiency i've come up with a larger approach, which is to use a kv-store instead as the shared store, which would store the value as list. The benefits of this approach are:
Only serde once that compose <timestamp, byte, key>, at the outer metered stores, with less byte array copies.
Deletes are straight-forward with no scan reads, just a single call to delete all duplicated <timestamp, byte, key> values.
Using a KV store has less space amplification than a segmented window store.
The cons though:
Each put call would be a get-then-write to append to the list; also we would spend a few more bytes to store the list (most likely a singleton list, and hence just 4 more bytes).
It's more complicated definitely.. :)
The main idea is that since the shared store is actively GC'ed by the expiration logic, not based on time retention, and since that the key format is in <timestamp, byte, key>, the range expiration query is quite efficient as well.
Added testing covering for the list stores (since we are still use kv-store interface, we cannot leverage on the get() calls in the stream-stream join, instead we use putIfAbsent and range only). Another minor factoring piggy-backed is to let toList to always close iterator to avoid leaking.
Reviewers: Sergio Peña <sergio@confluent.io>, Matthias J. Sax <mjsax@apache.org>
When introducing backward iterator for WindowStroe in #9138, we forgot to make "each segment" in reverse order (i.e. in descendingMap) in InMemoryWindowStore. Fix it and add integration tests for it.
Currently, in Window store, we store records in [segments -> [records] ].
For example:
window size = 500,
input records:
key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window
So, internally, the "a" and "b" will be in the same segment, and "c" in another segments.
segments: [0 /* window start */, records], [500, records].
And the records for window start 0 will be "a" and "b".
the records for window start 500 will be "c".
Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing backwardFetchAll, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously.
Reviewers: Jorge Esteban Quilcate Otoya <quilcate.jorge@gmail.com>, Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>
When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .
I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Guozhang Wang <wangguoz@gmail.com>
KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
Implementation of KIP-773
Deprecates inconsistent metrics bufferpool-wait-time-total,
io-waittime-total, and iotime-total.
Introduces new metrics bufferpool-wait-time-ns-total,
io-wait-time-ns-total, and io-time-ns-total with the same semantics as
before.
Adds metrics (old and new) in ops.html.
Adds upgrade guide for these metrics.
Reviewers: Guozhang Wang <wangguoz@gmail.com>, Tom Bentley <tbentley@redhat.com>
Description
Streams disables the write-ahead log (WAL) provided by RocksDB since it replicates the data in changelog topics. Hence, it does not make much sense to set WAL-related configs for RocksDB.
Proposed solution
Ignore any WAL-related configuration and state in the log that we are ignoring them.
Co-authored-by: Tomer Wizman <tomer.wizman@personetics.com>
Co-authored-by: Bruno Cadonna <cadonna@apache.org>
Reviewers: Boyang Chen <boyang@apache.org>, Bruno Cadonna <cadonna@apache.org>
As detailed in KAFKA-12994, unit tests using the old API should be either removed or migrated to the new API.
This PR migrates relevant tests in JoinWindowsTest.java and SessionWindowsTest.java.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Migrate Suppress as part of the migration of KStream/KTable
operations to the new Processor API (KAFKA-8410)
Reviewers: John Roesler <vvcephei@apache.org>
Currently KStreamMap and KStreamFlatMap classes will throw NPE if the call to KeyValueMapper#apply() return null. This commit checks whether the result of KeyValueMapper#apply() is null and throws a more meaningful error message for better debugging.
Two unit tests are also added to check if we successfully captured nulls.
Reviewers: Josep Prat <josep.prat@aiven.io>, Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
Instead of letting all RuntimeExceptions go through and be processed by the uncaught exception handler, IllegalStateException and IllegalArgumentException are not passed through and fail fast. In this PR when setting the uncaught exception handler we check if the exception is in an "exclude list", if so, we terminate the client, otherwise we continue as usual.
Added test checking this new case. Added integration test checking that user defined exception handler is not used when an IllegalStateException is thrown.
Reviewers: Bruno Cadonna <cadonna@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
* Add the following producer metrics:
flush-time-total: cumulative sum of time elapsed during in flush.
txn-init-time-total: cumulative sum of time elapsed during in initTransactions.
txn-begin-time-total: cumulative sum of time elapsed during in beginTransaction.
txn-send-offsets-time-total: cumulative sum of time elapsed during in sendOffsetsToTransaction.
txn-commit-time-total: cumulative sum of time elapsed during in commitTransaction.
txn-abort-time-total: cumulative sum of time elapsed during in abortTransaction.
* Add the following consumer metrics:
commited-time-total: cumulative sum of time elapsed during in committed.
commit-sync-time-total: cumulative sum of time elapsed during in commitSync.
* Add a total-blocked-time metric to streams that is the sum of:
consumer’s io-waittime-total
consumer’s iotime-total
consumer’s committed-time-total
consumer’s commit-sync-time-total
restore consumer’s io-waittime-total
restore consumer’s iotime-total
admin client’s io-waittime-total
admin client’s iotime-total
producer’s bufferpool-wait-time-total
producer's flush-time-total
producer's txn-init-time-total
producer's txn-begin-time-total
producer's txn-send-offsets-time-total
producer's txn-commit-time-total
producer's txn-abort-time-total
Reviewers: Bruno Cadonna <cadonna@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Add a new case to the list of possible retriable exceptions for the flaky tests to take care of threads starting up
Reviewers: Leah Thomas <lthomas@confluent.io>, Anna Sophie Blee-Goldman
The GraphNode#writeToTopology method accepts a Properties input parameter, but never uses it in any of its implementations. We can remove this parameter to clean things up and help make it clear that writing nodes to the topology doesn't involve the app properties.
Reviewers: Bruno Cadonna <cadonna@confluent.io>
Currently in OrderedBytes#upperRange method, we'll check key bytes 1 by 1, to see if there's a byte value >= first timestamp byte value, so that we can skip the following key bytes, because we know compareTo will always return 0 or 1. However, in most cases, the first timestamp byte is always 0, more specifically the upperRange is called for both window store and session store. For former, the suffix is in timestamp, Long.MAX_VALUE and for latter the suffix is in Long.MAX_VALUE, timestamp. For Long.MAX_VALUE the first digit is not 0, for timestamp it could be 0 or not, but as long as it is up to "now" (i.e. Aug. 23rd) then the first byte should be 0 since the value is far smaller than what a long typed value could have. So in practice for window stores, that suffix's first byte has a large chance to be 0, and hence worth optimizing for.
This PR optimizes the not all query cases by not checking the key byte 1 by 1 (because we know the unsigned integer will always be >= 0), instead, put all bytes and timestamp directly. So, we won't have byte array copy in the end either.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
The alive stream threads metric relies on the threads field as a monitor object for
its synchronized block. When the alive stream threads metric is registered it isn't
initialised so any call to get the metric value before it is initialised will result
in a null pointer exception.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Walker Carlson <wcarlson@confluent.io>
We increased the default session timeout to 30s in KIP-735:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout
Since then, we are observing sporadic system test failures
due to rebalances taking longer than the test timeout.
Rather than increase the test wait times, we can just override
the session timeout to a value more appropriate in the testing
domain.
Reviewers: A. Sophie Blee-Goldman <ableegoldman@apache.org>
1. This test is taking two iterations since the firs iteration is designed to fail due to unknow topic leader. However both the timeout and the backoff are set to 50ms, while the actual SYSTEM time is used. This means we are likely to timeout before executing the second iteration. I thought about using a mock time but dropped that idea as it may forgo the purpose of this test, instead I set the backoff time to 10ms so that we are almost unlikely to hit this error anymore.
2. Found a minor issue while fixing this which is that when we have non-empty not-ready topics, but the topics-to-create is empty (which is possible as the test shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound itself illustrates), we still call an empty create-topic function. Though it does not incur any round-trip it would still waste some cycles, so I branched it off and hence also simplified some unit tests.
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Bruno Cadonna <cadonna@confluent.io>
Update to KIP-740.
Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Israel Ekpo <israelekpo@gmail.com>
We've seen a few failures recently due to the driver finishing
the production of data and verifying the results before the
whole cluster is even running.
Reviewers: Leah Thomas <lthomas@confluent.io>, Walker Carlson <wcarlson@confluent.io>, Matthias J. Sax <mjsax@apache.org>
Pt. 1: #10609
Pt. 2: #10683
Pt. 3: #10788
In Pt. 3 we implement the addNamedTopology API. This can be used to update the processing topology of a running Kafka Streams application without resetting the app, or even pausing/restarting the process. It's up to the user to ensure that this API is called on every instance of an application to ensure all clients are able to run the newly added NamedTopology.
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Addresses the handful of remaining feedback from Pt. 2, plus adds two new tests: one verifying a multi-topology application with a FKJ and its internal topics, another to make sure IQ works with named topologies (though note that there is a bit more work left for IQ to be fully supported, will be tackled after Pt. 3
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Add support to open endpoint range queries in key-value stores
Implements: KIP-763
Reviewers: Almog Gavra <almog@confluent.io>, Luke Chen <showuon@gmail.com>, John Roesler <vvcephei@apache.org>
Fix a bug in StoreQueryIntegrationTest::shouldQueryOnlyActivePartitionStoresByDefault that causes the test to fail in the case of a client rebalancing. The changes in this PR make sure the test keeps re-trying after a rebalancing operation, instead of failing.
Reviewers: Luke Chen <showuon@gmail.com>, John Roesler <vvcephei@apache.org>
As part of the migration of KStream/KTable operations to the new Processor API (KAFKA-8410), this PR includes the migration of KTable:
* mapValues,
* passthrough,
* and source operations.
Reviewers: John Roesler <vvcephei@apache.org>
I noticed that replace thread actions would not be logged unless the user added a log in the handler. I think it would be very useful for debugging.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Pt. 1: #10609
Pt. 2: #10683
Pt. 3: #10788
The TopologyMetadata is next up after Pt. 1 #10609. This PR sets up the basic architecture for running an app with multiple NamedTopologies, though the APIs to add/remove them dynamically are not implemented until Pt. 3
Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>